Phase 1 - SQLAlchemy 2.0 Compatability

One of the major changes in SQLAlchemy 2.0 is the removal
of autocommit support. It turns out Ironic was using this quite
aggressively without even really being aware of it.

* Moved the declaritive_base to ORM, as noted in the SQLAlchemy 2.0
  changes[0].

* Console testing caused us to become aware of issues around locking
  where session synchronization, when autocommit was enabled, was
  defaulted to False. The result of this is that you could have two
  sessions have different results, which could results on different
  threads, and where one could still attempt to lock based upon prior
  information. Inherently, while this basically worked, it was
  also sort of broken behavior. This resulted in locking being
  rewritten to use the style mandated in SQLAlchemy 2.0 migration
  documentation. This ultimately is due to locking, which is *heavily*
  relied upon in Ironic, and in unit testing with sqlite, there are
  no transactions, which means we can get some data inconsistency
  in unit testing as well if we're reliant upon the database to
  precisely and exactly return what we committed.[1]

* Begins changing the query.one()/query.all() style to use explicit
  select statements as part of the new style mandated for migration
  to SQLAlchemy 2.0.

* Instead of using field label strings for joined queries, use the
  object format, which makes much more sense now, and is part of
  the items required for eventual migration to 2.0.

* DB queries involving Traits are now loaded using SelectInLoad
  as opposed to Joins. The now deprecated ORM queries were quietly
  and silently de-duplicating rows and providing consistent sets
  from the resulting joined table responses, however putting much
  higher CPU load on the processing of results on the client.
  Prior performance testing has informed us this should be a minimal
  overhead impact, however these queries should no longer be in
  transactions with the Database Servers which should offset the
  shift in load pattern. The reason we cannot continue to deduplicate
  locally in our code is because we carry Dict data sets which cannot
  be hashed for deduplication. Most projects have handled this by
  treating them as Text and then converting, but without a massive
  rewrite, this seems to be the viable middle ground.

* Adds an explict mapping for traits and tags on the Node object
  to point directly to the NodeTrait and NodeTag classes. This
  superceeds the prior usage of a backref to make the association.

* Splits SQLAlchemy class model Node into Node and NodeBase, which
  allows for high performance queries to skip querying for ``tags``
  and ``traits``. Otherwise with the afrormentioned lookups would
  always execute as they are now properties as well on the Node
  class. This more common of a SQLAlchemy model, but Ironic's model
  has been a bit more rigid to date.

* Adds a ``start_consoles`` and ``start_allocations`` option to the
  conductor ``init_host`` method. This allows unit tests to be
  executed and launched with the service context, while *not* also
  creating race conditions which resulted in failed tests.

* The db API ``_paginate_query`` wrapper now contains additional
  logic to handle traditional ORM query responses and the newer style
  of unified query responses. Due to differences in queries and handling,
  which also was part of the driver for the creation of ``NodeBase``,
  as SQLAlchemy will only create an object if a base object is referenced.
  Also, by default, everything returned is a tuple in 1.4 with the
  unified interface.

* Also modified one unit test which counted time.sleep calls, which is
  a known pattern which can create failures which are ultimately noise.

Ultimately, I have labelled the remaining places which SQLAlchemy
warnings are raised at for deprecation/removal of functionality,
which needs to be addressed.

[0] https://docs.sqlalchemy.org/en/14/changelog/migration_20.html
[1] https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#transaction-isolation-level-autocommit

Change-Id: Ie0f4b8a814eaef1e852088d12d33ce1eab408e23
This commit is contained in:
Julia Kreger 2022-09-07 09:58:57 -07:00
parent 1435a15ce3
commit 49e085583d
15 changed files with 314 additions and 149 deletions

View File

@ -88,10 +88,14 @@ class BaseConductorManager(object):
# clear all locks held by this conductor before registering # clear all locks held by this conductor before registering
self.dbapi.clear_node_reservations_for_conductor(self.host) self.dbapi.clear_node_reservations_for_conductor(self.host)
def init_host(self, admin_context=None): def init_host(self, admin_context=None, start_consoles=True,
start_allocations=True):
"""Initialize the conductor host. """Initialize the conductor host.
:param admin_context: the admin context to pass to periodic tasks. :param admin_context: the admin context to pass to periodic tasks.
:param start_consoles: If consoles should be started in intialization.
:param start_allocations: If allocations should be started in
initialization.
:raises: RuntimeError when conductor is already running. :raises: RuntimeError when conductor is already running.
:raises: NoDriversLoaded when no drivers are enabled on the conductor. :raises: NoDriversLoaded when no drivers are enabled on the conductor.
:raises: DriverNotFound if a driver is enabled that does not exist. :raises: DriverNotFound if a driver is enabled that does not exist.
@ -189,8 +193,9 @@ class BaseConductorManager(object):
# Start consoles if it set enabled in a greenthread. # Start consoles if it set enabled in a greenthread.
try: try:
self._spawn_worker(self._start_consoles, if start_consoles:
ironic_context.get_admin_context()) self._spawn_worker(self._start_consoles,
ironic_context.get_admin_context())
except exception.NoFreeConductorWorker: except exception.NoFreeConductorWorker:
LOG.warning('Failed to start worker for restarting consoles.') LOG.warning('Failed to start worker for restarting consoles.')
@ -207,8 +212,9 @@ class BaseConductorManager(object):
# Resume allocations that started before the restart. # Resume allocations that started before the restart.
try: try:
self._spawn_worker(self._resume_allocations, if start_allocations:
ironic_context.get_admin_context()) self._spawn_worker(self._resume_allocations,
ironic_context.get_admin_context())
except exception.NoFreeConductorWorker: except exception.NoFreeConductorWorker:
LOG.warning('Failed to start worker for resuming allocations.') LOG.warning('Failed to start worker for resuming allocations.')
@ -539,6 +545,7 @@ class BaseConductorManager(object):
try: try:
with task_manager.acquire(context, node_uuid, shared=False, with task_manager.acquire(context, node_uuid, shared=False,
purpose='start console') as task: purpose='start console') as task:
notify_utils.emit_console_notification( notify_utils.emit_console_notification(
task, 'console_restore', task, 'console_restore',
obj_fields.NotificationStatus.START) obj_fields.NotificationStatus.START)

View File

@ -2203,18 +2203,16 @@ class ConductorManager(base_manager.BaseConductorManager):
""" """
LOG.debug('RPC set_console_mode called for node %(node)s with ' LOG.debug('RPC set_console_mode called for node %(node)s with '
'enabled %(enabled)s', {'node': node_id, 'enabled': enabled}) 'enabled %(enabled)s', {'node': node_id, 'enabled': enabled})
with task_manager.acquire(context, node_id, shared=True,
with task_manager.acquire(context, node_id, shared=False,
purpose='setting console mode') as task: purpose='setting console mode') as task:
node = task.node node = task.node
task.driver.console.validate(task) task.driver.console.validate(task)
if enabled == node.console_enabled: if enabled == node.console_enabled:
op = 'enabled' if enabled else 'disabled' op = 'enabled' if enabled else 'disabled'
LOG.info("No console action was triggered because the " LOG.info("No console action was triggered because the "
"console is already %s", op) "console is already %s", op)
else: else:
task.upgrade_lock()
node.last_error = None node.last_error = None
node.save() node.save()
task.spawn_after(self._spawn_worker, task.spawn_after(self._spawn_worker,
@ -3469,7 +3467,6 @@ class ConductorManager(base_manager.BaseConductorManager):
self.conductor.id): self.conductor.id):
# Another conductor has taken over, skipping # Another conductor has taken over, skipping
continue continue
LOG.debug('Taking over allocation %s', allocation.uuid) LOG.debug('Taking over allocation %s', allocation.uuid)
allocations.do_allocate(context, allocation) allocations.do_allocate(context, allocation)
except Exception: except Exception:

View File

@ -13,6 +13,4 @@
from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import enginefacade
# NOTE(dtantsur): we want sqlite as close to a real database as possible. # NOTE(dtantsur): we want sqlite as close to a real database as possible.
# FIXME(stephenfin): we need to remove reliance on autocommit semantics ASAP enginefacade.configure(sqlite_fk=True)
# since it's not compatible with SQLAlchemy 2.0
enginefacade.configure(sqlite_fk=True, __autocommit=True)

View File

@ -19,9 +19,11 @@ import datetime
import json import json
import threading import threading
from oslo_concurrency import lockutils
from oslo_db import api as oslo_db_api from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import orm as sa_orm
from oslo_db.sqlalchemy import utils as db_utils from oslo_db.sqlalchemy import utils as db_utils
from oslo_log import log from oslo_log import log
from oslo_utils import netutils from oslo_utils import netutils
@ -53,6 +55,10 @@ LOG = log.getLogger(__name__)
_CONTEXT = threading.local() _CONTEXT = threading.local()
RESERVATION_SEMAPHORE = "reserve_node_db_lock"
synchronized = lockutils.synchronized_with_prefix('ironic-')
# NOTE(mgoddard): We limit the number of traits per node to 50 as this is the # NOTE(mgoddard): We limit the number of traits per node to 50 as this is the
# maximum number of traits per resource provider allowed in placement. # maximum number of traits per resource provider allowed in placement.
MAX_TRAITS_PER_NODE = 50 MAX_TRAITS_PER_NODE = 50
@ -83,6 +89,11 @@ def _wrap_session(session):
def _get_node_query_with_all_for_single_node(): def _get_node_query_with_all_for_single_node():
"""Return a query object for the Node joined with all relevant fields. """Return a query object for the Node joined with all relevant fields.
Deprecated: This method, while useful, returns a "Legacy Query" object
which, while useful is considered a legacy object from SQLAlchemy
which at some point may be removed. SQLAlchemy encourages all users
to move to the unified ORM/Core Select interface.
This method utilizes a joined load query which creates a result set This method utilizes a joined load query which creates a result set
where corresponding traits, and tags, are joined together in the result where corresponding traits, and tags, are joined together in the result
set. set.
@ -109,9 +120,10 @@ def _get_node_query_with_all_for_single_node():
Where joins are super in-efficent for Ironic, is where nodes are being Where joins are super in-efficent for Ironic, is where nodes are being
enumerated, as the above result set pattern is not just for one node, but enumerated, as the above result set pattern is not just for one node, but
potentially thousands of nodes. In that case, we should use the potentially thousands of nodes. Please consider using _get_node_select
_get_node_query_with_all_for_list helper to return a more appropriate which results in a primary query for the nodes, and then performs
query object which will be more efficient for the end user. additional targeted queries for the joined tables, as opposed to
performing client side de-duplication.
:returns: a query object. :returns: a query object.
""" """
@ -131,45 +143,33 @@ def _get_node_query_with_all_for_single_node():
.options(joinedload('traits'))) .options(joinedload('traits')))
def _get_node_query_with_all_for_list(): def _get_node_select():
"""Return a query object for the Node with queried extra fields. """Returns a SQLAlchemy Select Object for Nodes.
This method returns a query object joining tags and traits in a pattern This method returns a pre-formatted select object which models
where the result set is first built, and then the resulting associations the entire Node object, allowing callers to operate on a node like
are queried separately and the objects are reconciled by SQLAlchemy to they would have with an SQLAlchemy ORM Query Object.
build the composite objects based upon the associations.
This results in the following query pattern when the query is executed: This object *also* performs two additional select queries, in the form
of a selectin operation, to achieve the same results of a Join query,
but without the join query itself, and the client side load.
select $fields from nodes where x; This method is best utilized when retrieving lists of nodes.
# SQLAlchemy creates a list of associated node IDs.
select $fields from tags where node_id in ('1', '3', '37268');
select $fields from traits where node_id in ('1', '3', '37268');
SQLAlchemy then returns a result set where the tags and traits are Select objects in this fashion were added as a result of SQLAlchemy 1.4
composited together efficently as opposed to having to deduplicate in preparation for SQLAlchemy 2.0's release to provide a unified
the result set. This shifts additional load to the database which select interface.
was previously a high overhead operation with-in the conductor...
which results in a slower conductor.
:returns: a query object. :returns: a select object
""" """
# NOTE(TheJulia): When comparing CI rubs *with* this being the default
# for all general list operations, at 10k nodes, this pattern appears
# to be on-par with a 5% variability between the two example benchmark
# tests. That being said, the test *does* not include tags or traits
# in it's test data set so client side deduplication is not measured.
# NOTE(TheJulia): Basic benchmark difference # NOTE(TheJulia): This returns a query in the SQLAlchemy 1.4->2.0
# tests data creation: 67.117 seconds # migration style as query model loading is deprecated.
# 2.32 seconds to obtain all nodes from SQLAlchemy (10k nodes)
# 4.99 seconds to obtain all nodes *and* have node objects (10k nodes) # This must use selectinload to avoid later need to invokededuplication.
# If this holds true, the required record deduplication with joinedload return (sa.select(models.Node)
# may be basically the same amount of overhead as requesting the tags .options(selectinload(models.Node.tags),
# and traits separately. selectinload(models.Node.traits)))
return (model_query(models.Node)
.options(selectinload('tags'))
.options(selectinload('traits')))
def _get_deploy_template_query_with_steps(): def _get_deploy_template_query_with_steps():
@ -332,8 +332,10 @@ def add_allocation_filter_by_conductor(query, value):
def _paginate_query(model, limit=None, marker=None, sort_key=None, def _paginate_query(model, limit=None, marker=None, sort_key=None,
sort_dir=None, query=None): sort_dir=None, query=None, return_base_tuple=False):
if not query: # NOTE(TheJulia): We can't just ask for the bool of query if it is
# populated, so we need to ask if it is None.
if query is None:
query = model_query(model) query = model_query(model)
sort_keys = ['id'] sort_keys = ['id']
if sort_key and sort_key not in sort_keys: if sort_key and sort_key not in sort_keys:
@ -345,7 +347,28 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
raise exception.InvalidParameterValue( raise exception.InvalidParameterValue(
_('The sort_key value "%(key)s" is an invalid field for sorting') _('The sort_key value "%(key)s" is an invalid field for sorting')
% {'key': sort_key}) % {'key': sort_key})
return query.all() with _session_for_read() as session:
# NOTE(TheJulia): SQLAlchemy 2.0 no longer returns pre-uniqued result
# sets in ORM mode, so we need to explicitly ask for it to be unique
# before returning it to the caller.
if isinstance(query, sa_orm.Query):
# The classic ORM query object result set which is deprecated
# in advance of SQLAlchemy 2.0.
return query.all()
else:
# In this case, we have a sqlalchemy.sql.selectable.Select
# (most likely) which utilizes the unified select interface.
res = session.execute(query).fetchall()
if len(res) == 0:
# Return an empty list instead of a class with no objects.
return []
if return_base_tuple:
# The caller expects a tuple, lets just give it to them.
return res
# Everything is a tuple in a resultset from the unified interface
# but for objects, our model expects just object access,
# so we extract and return them.
return [r[0] for r in res]
def _filter_active_conductors(query, interval=None): def _filter_active_conductors(query, interval=None):
@ -514,15 +537,16 @@ class Connection(api.Connection):
else: else:
columns = [getattr(models.Node, c) for c in columns] columns = [getattr(models.Node, c) for c in columns]
query = model_query(*columns, base_model=models.Node) query = sa.select(*columns)
query = self._add_nodes_filters(query, filters) query = self._add_nodes_filters(query, filters)
return _paginate_query(models.Node, limit, marker, return _paginate_query(models.Node, limit, marker,
sort_key, sort_dir, query) sort_key, sort_dir, query,
return_base_tuple=True)
def get_node_list(self, filters=None, limit=None, marker=None, def get_node_list(self, filters=None, limit=None, marker=None,
sort_key=None, sort_dir=None, fields=None): sort_key=None, sort_dir=None, fields=None):
if not fields: if not fields:
query = _get_node_query_with_all_for_list() query = _get_node_select()
query = self._add_nodes_filters(query, filters) query = self._add_nodes_filters(query, filters)
return _paginate_query(models.Node, limit, marker, return _paginate_query(models.Node, limit, marker,
sort_key, sort_dir, query) sort_key, sort_dir, query)
@ -559,24 +583,25 @@ class Connection(api.Connection):
# with SQLAlchemy. # with SQLAlchemy.
traits_found = True traits_found = True
use_columns.remove('traits') use_columns.remove('traits')
# Generate the column object list so SQLAlchemy only fulfills # Generate the column object list so SQLAlchemy only fulfills
# the requested columns. # the requested columns.
use_columns = [getattr(models.Node, c) for c in use_columns] use_columns = [getattr(models.Node, c) for c in use_columns]
# In essence, traits (and anything else needed to generate the # In essence, traits (and anything else needed to generate the
# composite objects) need to be reconciled without using a join # composite objects) need to be reconciled without using a join
# as multiple rows can be generated in the result set being returned # as multiple rows can be generated in the result set being returned
# from the database server. In this case, with traits, we use # from the database server. In this case, with traits, we use
# a selectinload pattern. # a selectinload pattern.
if traits_found: if traits_found:
query = model_query(models.Node).options( query = sa.select(models.Node).options(
Load(models.Node).load_only(*use_columns), selectinload(models.Node.traits),
selectinload(models.Node.traits)) Load(models.Node).load_only(*use_columns)
)
else: else:
query = model_query(models.Node).options( # Note for others, if you ask for a whole model, it is
Load(models.Node).load_only(*use_columns)) # modeled, i.e. you can access it as an object.
query = sa.select(models.NodeBase).options(
Load(models.Node).load_only(*use_columns)
)
query = self._add_nodes_filters(query, filters) query = self._add_nodes_filters(query, filters)
return _paginate_query(models.Node, limit, marker, return _paginate_query(models.Node, limit, marker,
sort_key, sort_dir, query) sort_key, sort_dir, query)
@ -618,40 +643,85 @@ class Connection(api.Connection):
return mapping return mapping
@synchronized(RESERVATION_SEMAPHORE, fair=True)
def _reserve_node_place_lock(self, tag, node_id, node):
try:
# NOTE(TheJulia): We explicitly do *not* synch the session
# so the other actions in the conductor do not become aware
# that the lock is in place and believe they hold the lock.
# This necessitates an overall lock in the code side, so
# we avoid conditions where two separate threads can believe
# they hold locks at the same time.
with _session_for_write() as session:
res = session.execute(
sa.update(models.Node).
where(models.Node.id == node.id).
where(models.Node.reservation == None). # noqa
values(reservation=tag).
execution_options(synchronize_session=False))
session.flush()
node = self._get_node_by_id_no_joins(node.id)
# NOTE(TheJulia): In SQLAlchemy 2.0 style, we don't
# magically get a changed node as they moved from the
# many ways to do things to singular ways to do things.
if res.rowcount != 1:
# Nothing updated and node exists. Must already be
# locked.
raise exception.NodeLocked(node=node.uuid,
host=node.reservation)
except NoResultFound:
# In the event that someone has deleted the node on
# another thread.
raise exception.NodeNotFound(node=node_id)
@oslo_db_api.retry_on_deadlock @oslo_db_api.retry_on_deadlock
def reserve_node(self, tag, node_id): def reserve_node(self, tag, node_id):
with _session_for_write(): with _session_for_read():
query = _get_node_query_with_all_for_single_node()
query = add_identity_filter(query, node_id)
count = query.filter_by(reservation=None).update(
{'reservation': tag}, synchronize_session=False)
try: try:
# TODO(TheJulia): Figure out a good way to query
# this so that we do it as light as possible without
# the full object invocation, which will speed lock
# activities. Granted, this is all at the DB level
# so maybe that is okay in the grand scheme of things.
query = model_query(models.Node)
query = add_identity_filter(query, node_id)
node = query.one() node = query.one()
if count != 1:
# Nothing updated and node exists. Must already be
# locked.
raise exception.NodeLocked(node=node.uuid,
host=node['reservation'])
return node
except NoResultFound: except NoResultFound:
raise exception.NodeNotFound(node=node_id) raise exception.NodeNotFound(node=node_id)
if node.reservation:
# Fail fast, instead of attempt the update.
raise exception.NodeLocked(node=node.uuid,
host=node.reservation)
self._reserve_node_place_lock(tag, node_id, node)
# Return a node object as that is the contract for this method.
return self.get_node_by_id(node.id)
@oslo_db_api.retry_on_deadlock @oslo_db_api.retry_on_deadlock
def release_node(self, tag, node_id): def release_node(self, tag, node_id):
with _session_for_write(): with _session_for_read():
query = model_query(models.Node)
query = add_identity_filter(query, node_id)
# be optimistic and assume we usually release a reservation
count = query.filter_by(reservation=tag).update(
{'reservation': None}, synchronize_session=False)
try: try:
if count != 1: query = model_query(models.Node)
node = query.one() query = add_identity_filter(query, node_id)
if node['reservation'] is None: node = query.one()
except NoResultFound:
raise exception.NodeNotFound(node=node_id)
with _session_for_write() as session:
try:
res = session.execute(
sa.update(models.Node).
where(models.Node.id == node.id).
where(models.Node.reservation == tag).
values(reservation=None).
execution_options(synchronize_session=False)
)
node = self.get_node_by_id(node.id)
if res.rowcount != 1:
if node.reservation is None:
raise exception.NodeNotLocked(node=node.uuid) raise exception.NodeNotLocked(node=node.uuid)
else: else:
raise exception.NodeLocked(node=node.uuid, raise exception.NodeLocked(node=node.uuid,
host=node['reservation']) host=node['reservation'])
session.flush()
except NoResultFound: except NoResultFound:
raise exception.NodeNotFound(node=node_id) raise exception.NodeNotFound(node=node_id)
@ -677,47 +747,68 @@ class Connection(api.Connection):
node = models.Node() node = models.Node()
node.update(values) node.update(values)
with _session_for_write() as session: try:
try: with _session_for_write() as session:
session.add(node) session.add(node)
# Set tags & traits to [] for new created node
# NOTE(mgoddard): We need to set the tags and traits fields in
# the session context, otherwise SQLAlchemy will try and fail
# to lazy load the attributes, resulting in an exception being
# raised.
node['tags'] = []
node['traits'] = []
session.flush() session.flush()
except db_exc.DBDuplicateEntry as exc: except db_exc.DBDuplicateEntry as exc:
if 'name' in exc.columns: if 'name' in exc.columns:
raise exception.DuplicateName(name=values['name']) raise exception.DuplicateName(name=values['name'])
elif 'instance_uuid' in exc.columns: elif 'instance_uuid' in exc.columns:
raise exception.InstanceAssociated( raise exception.InstanceAssociated(
instance_uuid=values['instance_uuid'], instance_uuid=values['instance_uuid'],
node=values['uuid']) node=values['uuid'])
raise exception.NodeAlreadyExists(uuid=values['uuid']) raise exception.NodeAlreadyExists(uuid=values['uuid'])
# Set tags & traits to [] for new created node
# NOTE(mgoddard): We need to set the tags and traits fields in the
# session context, otherwise SQLAlchemy will try and fail to lazy
# load the attributes, resulting in an exception being raised.
node['tags'] = []
node['traits'] = []
return node return node
def get_node_by_id(self, node_id): def _get_node_by_id_no_joins(self, node_id):
query = _get_node_query_with_all_for_single_node() # TODO(TheJulia): Maybe replace with this with a minimal
query = query.filter_by(id=node_id) # "get these three fields" thing.
try: try:
return query.one() with _session_for_read() as session:
# Explicitly load NodeBase as the invocation of the
# priamary model object reesults in the join query
# triggering.
return session.execute(
sa.select(models.NodeBase).filter_by(id=node_id).limit(1)
).scalars().first()
except NoResultFound:
raise exception.NodeNotFound(node=node_id)
def get_node_by_id(self, node_id):
try:
query = _get_node_select()
with _session_for_read() as session:
return session.scalars(
query.filter_by(id=node_id).limit(1)
).unique().one()
except NoResultFound: except NoResultFound:
raise exception.NodeNotFound(node=node_id) raise exception.NodeNotFound(node=node_id)
def get_node_by_uuid(self, node_uuid): def get_node_by_uuid(self, node_uuid):
query = _get_node_query_with_all_for_single_node()
query = query.filter_by(uuid=node_uuid)
try: try:
return query.one() query = _get_node_select()
with _session_for_read() as session:
return session.scalars(
query.filter_by(uuid=node_uuid).limit(1)
).unique().one()
except NoResultFound: except NoResultFound:
raise exception.NodeNotFound(node=node_uuid) raise exception.NodeNotFound(node=node_uuid)
def get_node_by_name(self, node_name): def get_node_by_name(self, node_name):
query = _get_node_query_with_all_for_single_node()
query = query.filter_by(name=node_name)
try: try:
return query.one() query = _get_node_select()
with _session_for_read() as session:
return session.scalars(
query.filter_by(name=node_name).limit(1)
).unique().one()
except NoResultFound: except NoResultFound:
raise exception.NodeNotFound(node=node_name) raise exception.NodeNotFound(node=node_name)
@ -725,15 +816,14 @@ class Connection(api.Connection):
if not uuidutils.is_uuid_like(instance): if not uuidutils.is_uuid_like(instance):
raise exception.InvalidUUID(uuid=instance) raise exception.InvalidUUID(uuid=instance)
query = _get_node_query_with_all_for_single_node()
query = query.filter_by(instance_uuid=instance)
try: try:
result = query.one() query = _get_node_select()
with _session_for_read() as session:
return session.scalars(
query.filter_by(instance_uuid=instance).limit(1)
).unique().one()
except NoResultFound: except NoResultFound:
raise exception.InstanceNotFound(instance=instance) raise exception.InstanceNotFound(instance_uuid=instance)
return result
@oslo_db_api.retry_on_deadlock @oslo_db_api.retry_on_deadlock
def destroy_node(self, node_id): def destroy_node(self, node_id):
@ -849,6 +939,9 @@ class Connection(api.Connection):
# Return the updated node model joined with all relevant fields. # Return the updated node model joined with all relevant fields.
query = _get_node_query_with_all_for_single_node() query = _get_node_query_with_all_for_single_node()
query = add_identity_filter(query, node_id) query = add_identity_filter(query, node_id)
# FIXME(TheJulia): This entire method needs to be re-written to
# use the proper execution format for SQLAlchemy 2.0. Likely
# A query, independent update, and a re-query on the transaction.
return query.one() return query.one()
def get_port_by_id(self, port_id): def get_port_by_id(self, port_id):
@ -925,15 +1018,15 @@ class Connection(api.Connection):
port = models.Port() port = models.Port()
port.update(values) port.update(values)
with _session_for_write() as session: try:
try: with _session_for_write() as session:
session.add(port) session.add(port)
session.flush() session.flush()
except db_exc.DBDuplicateEntry as exc: except db_exc.DBDuplicateEntry as exc:
if 'address' in exc.columns: if 'address' in exc.columns:
raise exception.MACAlreadyExists(mac=values['address']) raise exception.MACAlreadyExists(mac=values['address'])
raise exception.PortAlreadyExists(uuid=values['uuid']) raise exception.PortAlreadyExists(uuid=values['uuid'])
return port return port
@oslo_db_api.retry_on_deadlock @oslo_db_api.retry_on_deadlock
def update_port(self, port_id, values): def update_port(self, port_id, values):
@ -1110,13 +1203,13 @@ class Connection(api.Connection):
chassis = models.Chassis() chassis = models.Chassis()
chassis.update(values) chassis.update(values)
with _session_for_write() as session: try:
try: with _session_for_write() as session:
session.add(chassis) session.add(chassis)
session.flush() session.flush()
except db_exc.DBDuplicateEntry: except db_exc.DBDuplicateEntry:
raise exception.ChassisAlreadyExists(uuid=values['uuid']) raise exception.ChassisAlreadyExists(uuid=values['uuid'])
return chassis return chassis
@oslo_db_api.retry_on_deadlock @oslo_db_api.retry_on_deadlock
def update_chassis(self, chassis_id, values): def update_chassis(self, chassis_id, values):
@ -1293,6 +1386,13 @@ class Connection(api.Connection):
def register_conductor_hardware_interfaces(self, conductor_id, interfaces): def register_conductor_hardware_interfaces(self, conductor_id, interfaces):
with _session_for_write() as session: with _session_for_write() as session:
try: try:
try:
session.begin()
except sa.exc.InvalidRequestError:
# When running unit tests, the transaction reports as
# already started, where as in service startup this is
# the first write op.
pass
for iface in interfaces: for iface in interfaces:
conductor_hw_iface = models.ConductorHardwareInterfaces() conductor_hw_iface = models.ConductorHardwareInterfaces()
conductor_hw_iface['conductor_id'] = conductor_id conductor_hw_iface['conductor_id'] = conductor_id
@ -1388,6 +1488,8 @@ class Connection(api.Connection):
q = q.filter(models.Port.address.in_(addresses)) q = q.filter(models.Port.address.in_(addresses))
try: try:
# FIXME(TheJulia): This needs to be updated to be
# an explicit query to identify the node for SQLAlchemy.
return q.one() return q.one()
except NoResultFound: except NoResultFound:
raise exception.NodeNotFound( raise exception.NodeNotFound(
@ -1586,6 +1688,8 @@ class Connection(api.Connection):
if not versions: if not versions:
return [] return []
if model_name == 'Node':
model_name = 'NodeBase'
model = models.get_class(model_name) model = models.get_class(model_name)
# NOTE(rloo): .notin_ does not handle null: # NOTE(rloo): .notin_ does not handle null:
@ -1614,7 +1718,11 @@ class Connection(api.Connection):
""" """
object_versions = release_mappings.get_object_versions() object_versions = release_mappings.get_object_versions()
table_missing_ok = False table_missing_ok = False
for model in models.Base.__subclasses__(): models_to_check = models.Base.__subclasses__()
# We need to append Node to the list as it is a subclass of
# NodeBase, which is intentional to delineate excess queries.
models_to_check.append(models.Node)
for model in models_to_check:
if model.__name__ not in object_versions: if model.__name__ not in object_versions:
continue continue
@ -1688,8 +1796,9 @@ class Connection(api.Connection):
mapping = release_mappings.RELEASE_MAPPING['master']['objects'] mapping = release_mappings.RELEASE_MAPPING['master']['objects']
total_to_migrate = 0 total_to_migrate = 0
total_migrated = 0 total_migrated = 0
all_models = models.Base.__subclasses__()
sql_models = [model for model in models.Base.__subclasses__() all_models.append(models.Node)
sql_models = [model for model in all_models
if model.__name__ in mapping] if model.__name__ in mapping]
for model in sql_models: for model in sql_models:
version = mapping[model.__name__][0] version = mapping[model.__name__][0]
@ -2238,6 +2347,7 @@ class Connection(api.Connection):
# Return the updated template joined with all relevant fields. # Return the updated template joined with all relevant fields.
query = _get_deploy_template_query_with_steps() query = _get_deploy_template_query_with_steps()
query = add_identity_filter(query, template_id) query = add_identity_filter(query, template_id)
# FIXME(TheJulia): This needs to be fixed for SQLAlchemy 2.0.
return query.one() return query.one()
except db_exc.DBDuplicateEntry as e: except db_exc.DBDuplicateEntry as e:
if 'name' in e.columns: if 'name' in e.columns:
@ -2260,6 +2370,7 @@ class Connection(api.Connection):
query = (_get_deploy_template_query_with_steps() query = (_get_deploy_template_query_with_steps()
.filter_by(**{field: value})) .filter_by(**{field: value}))
try: try:
# FIXME(TheJulia): This needs to be fixed for SQLAlchemy 2.0
return query.one() return query.one()
except NoResultFound: except NoResultFound:
raise exception.DeployTemplateNotFound(template=value) raise exception.DeployTemplateNotFound(template=value)

View File

@ -19,6 +19,7 @@ SQLAlchemy models for baremetal data.
""" """
from os import path from os import path
from typing import List
from urllib import parse as urlparse from urllib import parse as urlparse
from oslo_db import options as db_options from oslo_db import options as db_options
@ -27,8 +28,8 @@ from oslo_db.sqlalchemy import types as db_types
from sqlalchemy import Boolean, Column, DateTime, false, Index from sqlalchemy import Boolean, Column, DateTime, false, Index
from sqlalchemy import ForeignKey, Integer from sqlalchemy import ForeignKey, Integer
from sqlalchemy import schema, String, Text from sqlalchemy import schema, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import orm from sqlalchemy import orm
from sqlalchemy.orm import declarative_base
from ironic.common import exception from ironic.common import exception
from ironic.common.i18n import _ from ironic.common.i18n import _
@ -116,8 +117,8 @@ class ConductorHardwareInterfaces(Base):
default = Column(Boolean, default=False, nullable=False) default = Column(Boolean, default=False, nullable=False)
class Node(Base): class NodeBase(Base):
"""Represents a bare metal node.""" """Represents a base bare metal node."""
__tablename__ = 'nodes' __tablename__ = 'nodes'
__table_args__ = ( __table_args__ = (
@ -213,6 +214,32 @@ class Node(Base):
secure_boot = Column(Boolean, nullable=True) secure_boot = Column(Boolean, nullable=True)
class Node(NodeBase):
"""Represents a bare metal node."""
# NOTE(TheJulia): The purpose of the delineation between NodeBase and Node
# is to facilitate a hard delineation for queries where we do not need to
# populate additional information needlessly which would normally populate
# from the access of the property. In this case, Traits and Tags.
# The other reason we do this, is because these are generally "joined"
# data structures, we cannot de-duplicate node objects with unhashable dict
# data structures.
# NOTE(TheJulia): The choice of selectin lazy population is intentional
# as it causes a subselect to occur, skipping the need for deduplication
# in general. This puts a slightly higher query load on the DB server, but
# means *far* less gets shipped over the wire in the end.
traits: orm.Mapped[List['NodeTrait']] = orm.relationship( # noqa
"NodeTrait",
back_populates="node",
lazy="selectin")
tags: orm.Mapped[List['NodeTag']] = orm.relationship( # noqa
"NodeTag",
back_populates="node",
lazy="selectin")
class Port(Base): class Port(Base):
"""Represents a network port of a bare metal node.""" """Represents a network port of a bare metal node."""
@ -270,7 +297,6 @@ class NodeTag(Base):
node = orm.relationship( node = orm.relationship(
"Node", "Node",
backref='tags',
primaryjoin='and_(NodeTag.node_id == Node.id)', primaryjoin='and_(NodeTag.node_id == Node.id)',
foreign_keys=node_id foreign_keys=node_id
) )
@ -327,7 +353,6 @@ class NodeTrait(Base):
trait = Column(String(255), primary_key=True, nullable=False) trait = Column(String(255), primary_key=True, nullable=False)
node = orm.relationship( node = orm.relationship(
"Node", "Node",
backref='traits',
primaryjoin='and_(NodeTrait.node_id == Node.id)', primaryjoin='and_(NodeTrait.node_id == Node.id)',
foreign_keys=node_id foreign_keys=node_id
) )

View File

@ -91,13 +91,17 @@ class ReleaseMappingsTestCase(base.TestCase):
def test_contains_all_db_objects(self): def test_contains_all_db_objects(self):
self.assertIn('master', release_mappings.RELEASE_MAPPING) self.assertIn('master', release_mappings.RELEASE_MAPPING)
model_names = set((s.__name__ for s in models.Base.__subclasses__())) use_models = models.Base.__subclasses__()
use_models.append(models.Node)
model_names = set((s.__name__ for s in use_models))
# NOTE(xek): As a rule, all models which can be changed between # NOTE(xek): As a rule, all models which can be changed between
# releases or are sent through RPC should have their counterpart # releases or are sent through RPC should have their counterpart
# versioned objects. Do not add an exception for such objects, # versioned objects. Do not add an exception for such objects,
# initialize them with the version 1.0 instead. # initialize them with the version 1.0 instead.
# NodeBase is also excluded as it is covered by Node.
exceptions = set(['NodeTag', 'ConductorHardwareInterfaces', exceptions = set(['NodeTag', 'ConductorHardwareInterfaces',
'NodeTrait', 'DeployTemplateStep']) 'NodeTrait', 'DeployTemplateStep',
'NodeBase'])
model_names -= exceptions model_names -= exceptions
# NodeTrait maps to two objects # NodeTrait maps to two objects
model_names |= set(['Trait', 'TraitList']) model_names |= set(['Trait', 'TraitList'])

View File

@ -127,7 +127,12 @@ class ServiceSetUpMixin(object):
def setUp(self): def setUp(self):
super(ServiceSetUpMixin, self).setUp() super(ServiceSetUpMixin, self).setUp()
self.hostname = 'test-host' self.hostname = 'test-host'
self.config(node_locked_retry_attempts=1, group='conductor') # Relies upon the default number of "NodeLocked" retries as
# in unit testing, sqllite is not operated in a transactional
# way and utilizes asynchonous IO. Locking, in particular, can
# detect this, and it can cause some false or delayed inpressions
# of lock status, causing lock failures.
self.config(node_locked_retry_attempts=3, group='conductor')
self.config(node_locked_retry_interval=0, group='conductor') self.config(node_locked_retry_interval=0, group='conductor')
self.service = manager.ConductorManager(self.hostname, 'test-topic') self.service = manager.ConductorManager(self.hostname, 'test-topic')
@ -139,15 +144,18 @@ class ServiceSetUpMixin(object):
return return
self.service.del_host() self.service.del_host()
def _start_service(self, start_periodic_tasks=False): def _start_service(self, start_periodic_tasks=False, start_consoles=True,
start_allocations=True):
if start_periodic_tasks: if start_periodic_tasks:
self.service.init_host() self.service.init_host(start_consoles=start_consoles,
start_allocations=start_allocations)
else: else:
with mock.patch.object(periodics, 'PeriodicWorker', autospec=True): with mock.patch.object(periodics, 'PeriodicWorker', autospec=True):
with mock.patch.object(pxe_utils, 'place_common_config', with mock.patch.object(pxe_utils, 'place_common_config',
autospec=True): autospec=True):
self.service.prepare_host() self.service.prepare_host()
self.service.init_host() self.service.init_host(start_consoles=start_consoles,
start_allocations=start_allocations)
self.addCleanup(self._stop_service) self.addCleanup(self._stop_service)

View File

@ -209,7 +209,7 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
state='allocating', state='allocating',
conductor_affinity=dead_conductor.id) conductor_affinity=dead_conductor.id)
self._start_service() self._start_service(start_allocations=False)
with mock.patch.object(self.dbapi, 'get_offline_conductors', with mock.patch.object(self.dbapi, 'get_offline_conductors',
autospec=True) as mock_conds: autospec=True) as mock_conds:
mock_conds.return_value = [dead_conductor.id] mock_conds.return_value = [dead_conductor.id]

View File

@ -494,9 +494,11 @@ class StartConsolesTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
obj_utils.create_test_node( obj_utils.create_test_node(
self.context, self.context,
uuid=uuidutils.generate_uuid(), uuid=uuidutils.generate_uuid(),
driver='fake-hardware' driver='fake-hardware',
) )
self._start_service() # Enable consoles *after* service has started, otherwise it races
# as the service startup also launches consoles.
self._start_service(start_consoles=False)
self.service._start_consoles(self.context) self.service._start_consoles(self.context)
self.assertEqual(2, mock_start_console.call_count) self.assertEqual(2, mock_start_console.call_count)
mock_notify.assert_has_calls( mock_notify.assert_has_calls(

View File

@ -7256,7 +7256,7 @@ class DoNodeTakeOverTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
mock_take_over, mock_take_over,
mock_start_console, mock_start_console,
mock_notify): mock_notify):
self._start_service() self._start_service(start_consoles=False)
node = obj_utils.create_test_node(self.context, driver='fake-hardware', node = obj_utils.create_test_node(self.context, driver='fake-hardware',
console_enabled=True) console_enabled=True)
di_info = node.driver_internal_info di_info = node.driver_internal_info

View File

@ -166,7 +166,9 @@ class DbConductorTestCase(base.DbTestCase):
c = self._create_test_cdr() c = self._create_test_cdr()
self.dbapi.touch_conductor(c.hostname) self.dbapi.touch_conductor(c.hostname)
self.assertEqual(2, mock_update.call_count) self.assertEqual(2, mock_update.call_count)
self.assertEqual(2, mock_sleep.call_count) # Count that it was called, but not the number of times
# as this is *actually* time.sleep via import from oslo_db.api
self.assertTrue(mock_sleep.called)
def test_touch_conductor_not_found(self): def test_touch_conductor_not_found(self):
# A conductor's heartbeat will not create a new record, # A conductor's heartbeat will not create a new record,

View File

@ -367,10 +367,10 @@ class DbNodeTestCase(base.DbTestCase):
res = self.dbapi.get_node_list(filters={'maintenance': False}) res = self.dbapi.get_node_list(filters={'maintenance': False})
self.assertEqual([node1.id], [r.id for r in res]) self.assertEqual([node1.id], [r.id for r in res])
res = self.dbapi.get_nodeinfo_list(filters={'fault': 'boom'}) res = self.dbapi.get_node_list(filters={'fault': 'boom'})
self.assertEqual([node2.id], [r.id for r in res]) self.assertEqual([node2.id], [r.id for r in res])
res = self.dbapi.get_nodeinfo_list(filters={'fault': 'moob'}) res = self.dbapi.get_node_list(filters={'fault': 'moob'})
self.assertEqual([], [r.id for r in res]) self.assertEqual([], [r.id for r in res])
res = self.dbapi.get_node_list(filters={'resource_class': 'foo'}) res = self.dbapi.get_node_list(filters={'resource_class': 'foo'})
@ -558,6 +558,9 @@ class DbNodeTestCase(base.DbTestCase):
'cat': 'meow'}, 'cat': 'meow'},
internal_info={'corgi': 'rocks'}, internal_info={'corgi': 'rocks'},
deploy_interface='purring_machine') deploy_interface='purring_machine')
utils.create_test_node_traits(node_id=node.id,
traits=['atrait'])
uuids.append(str(node['uuid'])) uuids.append(str(node['uuid']))
req_fields = ['uuid', req_fields = ['uuid',
'provision_state', 'provision_state',

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
Ironic has started the process of upgrading the code base to support
SQLAlchemy 2.0 in anticipation of it's release. This results in the
minimum version of SQLAlchemy becoming 1.4.0 as it contains migration
features for the move to SQLAlchemy 2.0.

View File

@ -6,7 +6,7 @@
# of appearance. Changing the order has an impact on the overall integration # of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later. # process, which may cause wedges in the gate later.
pbr>=3.1.1 # Apache-2.0 pbr>=3.1.1 # Apache-2.0
SQLAlchemy>=1.2.19 # MIT SQLAlchemy>=1.4.0 # MIT
alembic>=1.4.2 # MIT alembic>=1.4.2 # MIT
automaton>=1.9.0 # Apache-2.0 automaton>=1.9.0 # Apache-2.0
eventlet!=0.18.3,!=0.20.1,>=0.18.2 # MIT eventlet!=0.18.3,!=0.20.1,>=0.18.2 # MIT

View File

@ -11,6 +11,7 @@ setenv = VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE = 1 PYTHONDONTWRITEBYTECODE = 1
LANGUAGE=en_US LANGUAGE=en_US
LC_ALL=en_US.UTF-8 LC_ALL=en_US.UTF-8
PYTHONUNBUFFERED=1
deps = deps =
-c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master}
-r{toxinidir}/requirements.txt -r{toxinidir}/requirements.txt