Replace more instances of model_query

The model_query call results in a nested read transaction, that does not
seem to play well with SQLite support. Since it's inherently relying on
the query style deprecated in SQLAlchemy 2.0, we need to migrate away
from this call. As an intermediate step, change instances of model_query
to session.query, making sure every call creates a session that lives
as long as is needed to fetch the results.

Removes a unit test which was built around creating
a fake deadlock condition to test that oslo_db was working
as expected. It's interaction was totally mocked, and in
retooling the base method there was no easy to keep
the same test logic around.

Co-Authored-By: Dmitry Tantsur <dtantsur@protonmail.com>
Change-Id: Ic8b1d964f7be5784e01c89bfb6c0277ea82eec2d
This commit is contained in:
Julia Kreger 2022-10-24 09:18:13 -07:00
parent 58fd2425f6
commit 5d5ae59538
2 changed files with 225 additions and 287 deletions

View File

@ -34,7 +34,6 @@ from osprofiler import sqlalchemy as osp_sqlalchemy
import sqlalchemy as sa
from sqlalchemy import or_
from sqlalchemy.exc import NoResultFound, MultipleResultsFound
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import Load
from sqlalchemy.orm import selectinload
from sqlalchemy import sql
@ -86,63 +85,6 @@ def _wrap_session(session):
return session
def _get_node_query_with_all_for_single_node():
"""Return a query object for the Node joined with all relevant fields.
Deprecated: This method, while useful, returns a "Legacy Query" object
which, while useful is considered a legacy object from SQLAlchemy
which at some point may be removed. SQLAlchemy encourages all users
to move to the unified ORM/Core Select interface.
This method utilizes a joined load query which creates a result set
where corresponding traits, and tags, are joined together in the result
set.
This is more efficent from a Queries Per Second standpoint with the
backend database, as they are not separate distinct queries which
are being executed by the client.
The downside of this, is the relationship of tags and traits to nodes
is that there may be multiple tags and traits for each node. Ultimately
this style of query forces SQLAlchemy to de-duplicate the result set
because the database returns the nodes portion of the result set for
every trait, tag, or other table field the query is joined with.
This looks like:
node1, tag1, trait1
node1, tag1, trait2
node1, tag1, trait3
node1, tag2, trait1
Et cetra, to create:
node1, [tag1, tag2], [trait1, trait 2, trait3]
Where joins are super in-efficent for Ironic, is where nodes are being
enumerated, as the above result set pattern is not just for one node, but
potentially thousands of nodes. Please consider using _get_node_select
which results in a primary query for the nodes, and then performs
additional targeted queries for the joined tables, as opposed to
performing client side de-duplication.
:returns: a query object.
"""
# NOTE(TheJulia): This *likely* ought to be selectinload, however
# it is a very common hit pattern for Ironic to query just the node.
# In those sorts of locations, the performance issues are less noticable
# to end users. *IF/WHEN* we change it to be selectinload for nodes,
# the resulting DB load will see a queries per second increase, which
# we should be careful about.
# NOTE(TheJulia): Basic benchmark difference
# Test data creation: 67.202 seconds.
# 2.43 seconds to obtain all nodes from SQLAlchemy (10k nodes)
# 5.15 seconds to obtain all nodes *and* have node objects (10k nodes)
return (model_query(models.Node)
.options(joinedload(models.Node.tags))
.options(joinedload(models.Node.traits)))
def _get_node_select():
"""Returns a SQLAlchemy Select Object for Nodes.
@ -182,15 +124,6 @@ def _get_deploy_template_select_with_steps():
).options(selectinload(models.DeployTemplate.steps))
def _get_deploy_template_query_with_steps():
"""Return a query object for the DeployTemplate joined with steps.
:returns: a query object.
"""
return model_query(models.DeployTemplate).options(
selectinload(models.DeployTemplate.steps))
def model_query(model, *args, **kwargs):
"""Query helper for simpler session usage.
@ -367,7 +300,7 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
# NOTE(TheJulia): We can't just ask for the bool of query if it is
# populated, so we need to ask if it is None.
if query is None:
query = model_query(model)
query = sa.select(model)
sort_keys = ['id']
if sort_key and sort_key not in sort_keys:
sort_keys.insert(0, sort_key)
@ -385,6 +318,12 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
if isinstance(query, sa_orm.Query):
# The classic "Legacy" ORM query object result set which is
# deprecated in advance of SQLAlchemy 2.0.
# TODO(TheJulia): Calls of this style basically need to be
# eliminated in ironic as returning this way does not allow
# commit or rollback in enginefacade to occur until the returned
# object is garbage collected as ORM Query objects allow
# for DB interactions to occur after the fact, so it remains
# connected to the DB..
return query.all()
else:
# In this case, we have a sqlalchemy.sql.selectable.Select
@ -653,19 +592,20 @@ class Connection(api.Connection):
raise exception.NodeNotFound(
_("Nodes cannot be found: %s") % ', '.join(missing))
query = model_query(models.Node.uuid, models.Node.name).filter(
sql.or_(models.Node.uuid.in_(uuids),
models.Node.name.in_(names))
)
if project:
query = query.filter((models.Node.owner == project)
| (models.Node.lessee == project))
with _session_for_read() as session:
query = session.query(models.Node.uuid, models.Node.name).filter(
sql.or_(models.Node.uuid.in_(uuids),
models.Node.name.in_(names))
)
if project:
query = query.filter((models.Node.owner == project)
| (models.Node.lessee == project))
for row in query:
if row[0] in idents:
mapping[row[0]] = row[0]
if row[1] and row[1] in idents:
mapping[row[1]] = row[0]
for row in query:
if row[0] in idents:
mapping[row[0]] = row[0]
if row[1] and row[1] in idents:
mapping[row[1]] = row[0]
missing = idents - set(mapping)
if missing:
@ -707,14 +647,14 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def reserve_node(self, tag, node_id):
with _session_for_read():
with _session_for_read() as session:
try:
# TODO(TheJulia): Figure out a good way to query
# this so that we do it as light as possible without
# the full object invocation, which will speed lock
# activities. Granted, this is all at the DB level
# so maybe that is okay in the grand scheme of things.
query = model_query(models.Node)
query = session.query(models.Node)
query = add_identity_filter(query, node_id)
node = query.one()
except NoResultFound:
@ -729,9 +669,9 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def release_node(self, tag, node_id):
with _session_for_read():
with _session_for_read() as session:
try:
query = model_query(models.Node)
query = session.query(models.Node)
query = add_identity_filter(query, node_id)
node = query.one()
except NoResultFound:
@ -859,7 +799,7 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_node(self, node_id):
with _session_for_write() as session:
query = model_query(models.Node)
query = session.query(models.Node)
query = add_identity_filter(query, node_id)
try:
@ -877,44 +817,45 @@ class Connection(api.Connection):
if uuidutils.is_uuid_like(node_id):
node_id = node_ref['id']
port_query = model_query(models.Port)
port_query = session.query(models.Port)
port_query = add_port_filter_by_node(port_query, node_id)
port_query.delete()
portgroup_query = model_query(models.Portgroup)
portgroup_query = session.query(models.Portgroup)
portgroup_query = add_portgroup_filter_by_node(portgroup_query,
node_id)
portgroup_query.delete()
# Delete all tags attached to the node
tag_query = model_query(models.NodeTag).filter_by(node_id=node_id)
tag_query = session.query(models.NodeTag).filter_by(
node_id=node_id)
tag_query.delete()
# Delete all traits attached to the node
trait_query = model_query(
trait_query = session.query(
models.NodeTrait).filter_by(node_id=node_id)
trait_query.delete()
volume_connector_query = model_query(
volume_connector_query = session.query(
models.VolumeConnector).filter_by(node_id=node_id)
volume_connector_query.delete()
volume_target_query = model_query(
volume_target_query = session.query(
models.VolumeTarget).filter_by(node_id=node_id)
volume_target_query.delete()
# delete all bios attached to the node
bios_settings_query = model_query(
bios_settings_query = session.query(
models.BIOSSetting).filter_by(node_id=node_id)
bios_settings_query.delete()
# delete all allocations for this node
allocation_query = model_query(
allocation_query = session.query(
models.Allocation).filter_by(node_id=node_id)
allocation_query.delete()
# delete all history for this node
history_query = model_query(
history_query = session.query(
models.NodeHistory).filter_by(node_id=node_id)
history_query.delete()
@ -942,10 +883,10 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def _do_update_node(self, node_id, values):
with _session_for_write():
with _session_for_write() as session:
# NOTE(mgoddard): Don't issue a joined query for the update as this
# does not work with PostgreSQL.
query = model_query(models.Node)
query = session.query(models.Node)
query = add_identity_filter(query, node_id)
try:
ref = query.with_for_update().one()
@ -1069,7 +1010,7 @@ class Connection(api.Connection):
raise exception.InvalidParameterValue(err=msg)
try:
with _session_for_write() as session:
query = model_query(models.Port)
query = session.query(models.Port)
query = add_port_filter(query, port_id)
ref = query.one()
ref.update(values)
@ -1085,8 +1026,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_port(self, port_id):
with _session_for_write():
query = model_query(models.Port)
with _session_for_write() as session:
query = session.query(models.Port)
query = add_port_filter(query, port_id)
count = query.delete()
if count == 0:
@ -1126,7 +1067,7 @@ class Connection(api.Connection):
def get_portgroup_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
query = model_query(models.Portgroup)
query = sa.select(models.Portgroup)
if project:
query = add_portgroup_filter_by_node_project(query, project)
return _paginate_query(models.Portgroup, limit, marker,
@ -1134,7 +1075,7 @@ class Connection(api.Connection):
def get_portgroups_by_node_id(self, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
query = model_query(models.Portgroup)
query = sa.select(models.Portgroup)
query = query.where(models.Portgroup.node_id == node_id)
if project:
query = add_portgroup_filter_by_node_project(query, project)
@ -1171,7 +1112,7 @@ class Connection(api.Connection):
with _session_for_write() as session:
try:
query = model_query(models.Portgroup)
query = session.query(models.Portgroup)
query = add_portgroup_filter(query, portgroup_id)
ref = query.one()
ref.update(values)
@ -1256,8 +1197,8 @@ class Connection(api.Connection):
msg = _("Cannot overwrite UUID for an existing Chassis.")
raise exception.InvalidParameterValue(err=msg)
with _session_for_write():
query = model_query(models.Chassis)
with _session_for_write() as session:
query = session.query(models.Chassis)
query = add_identity_where(query, models.Chassis, chassis_id)
count = query.update(values)
@ -1268,19 +1209,14 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_chassis(self, chassis_id):
def chassis_not_empty():
"""Checks whether the chassis does not have nodes."""
query = model_query(models.Node)
with _session_for_write() as session:
query = session.query(models.Node)
query = add_node_filter_by_chassis(query, chassis_id)
return query.count() != 0
with _session_for_write():
if chassis_not_empty():
if query.count() != 0:
raise exception.ChassisNotEmpty(chassis=chassis_id)
query = model_query(models.Chassis)
query = session.query(models.Chassis)
query = add_identity_filter(query, chassis_id)
count = query.delete()
@ -1290,7 +1226,7 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def register_conductor(self, values, update_existing=False):
with _session_for_write() as session:
query = (model_query(models.Conductor)
query = (session.query(models.Conductor)
.filter_by(hostname=values['hostname']))
try:
ref = query.one()
@ -1337,21 +1273,23 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def touch_conductor(self, hostname):
with _session_for_write():
query = model_query(models.Conductor)
query = query.where(models.Conductor.hostname == hostname)
# since we're not changing any other field, manually set updated_at
# and since we're heartbeating, make sure that online=True
count = query.update({'updated_at': timeutils.utcnow(),
'online': True})
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
with _session_for_write() as session:
query = sa.update(models.Conductor).where(
models.Conductor.hostname == hostname
).values({
'updated_at': timeutils.utcnow(),
'online': True}
).execution_options(synchronize_session=False)
res = session.execute(query)
count = res.rowcount
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
@oslo_db_api.retry_on_deadlock
def clear_node_reservations_for_conductor(self, hostname):
nodes = []
with _session_for_write():
query = (model_query(models.Node)
with _session_for_write() as session:
query = (session.query(models.Node)
.filter(models.Node.reservation.ilike(hostname)))
nodes = [node['uuid'] for node in query]
query.update({'reservation': None}, synchronize_session=False)
@ -1365,8 +1303,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def clear_node_target_power_state(self, hostname):
nodes = []
with _session_for_write():
query = (model_query(models.Node)
with _session_for_write() as session:
query = (session.query(models.Node)
.filter(models.Node.reservation.ilike(hostname)))
query = query.filter(models.Node.target_power_state != sql.null())
nodes = [node['uuid'] for node in query]
@ -1384,58 +1322,56 @@ class Connection(api.Connection):
'%(nodes)s', {'nodes': nodes})
def get_active_hardware_type_dict(self, use_groups=False):
query = (model_query(models.ConductorHardwareInterfaces,
models.Conductor)
.join(models.Conductor))
result = _filter_active_conductors(query)
with _session_for_read() as session:
query = (session.query(models.ConductorHardwareInterfaces,
models.Conductor)
.join(models.Conductor))
result = _filter_active_conductors(query)
d2c = collections.defaultdict(set)
for iface_row, cdr_row in result:
hw_type = iface_row['hardware_type']
if use_groups:
key = '%s:%s' % (cdr_row['conductor_group'], hw_type)
else:
key = hw_type
d2c[key].add(cdr_row['hostname'])
d2c = collections.defaultdict(set)
for iface_row, cdr_row in result:
hw_type = iface_row['hardware_type']
if use_groups:
key = '%s:%s' % (cdr_row['conductor_group'], hw_type)
else:
key = hw_type
d2c[key].add(cdr_row['hostname'])
return d2c
def get_offline_conductors(self, field='hostname'):
field = getattr(models.Conductor, field)
interval = CONF.conductor.heartbeat_timeout
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
result = (model_query(field)
.filter(models.Conductor.updated_at < limit))
return [row[0] for row in result]
with _session_for_read() as session:
field = getattr(models.Conductor, field)
interval = CONF.conductor.heartbeat_timeout
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
result = (session.query(field)
.filter(models.Conductor.updated_at < limit))
return [row[0] for row in result]
def get_online_conductors(self):
query = model_query(models.Conductor.hostname)
query = _filter_active_conductors(query)
return [row[0] for row in query]
with _session_for_read() as session:
query = session.query(models.Conductor.hostname)
query = _filter_active_conductors(query)
return [row[0] for row in query]
def list_conductor_hardware_interfaces(self, conductor_id):
query = (model_query(models.ConductorHardwareInterfaces)
.where(models.ConductorHardwareInterfaces.conductor_id == conductor_id)) # noqa
return query.all()
with _session_for_read() as session:
query = (session.query(models.ConductorHardwareInterfaces)
.filter_by(conductor_id=conductor_id))
return query.all()
def list_hardware_type_interfaces(self, hardware_types):
query = (model_query(models.ConductorHardwareInterfaces)
.filter(models.ConductorHardwareInterfaces.hardware_type
.in_(hardware_types)))
with _session_for_read() as session:
query = (session.query(models.ConductorHardwareInterfaces)
.filter(models.ConductorHardwareInterfaces.hardware_type
.in_(hardware_types)))
query = _filter_active_conductors(query)
return query.all()
query = _filter_active_conductors(query)
return query.all()
@oslo_db_api.retry_on_deadlock
def register_conductor_hardware_interfaces(self, conductor_id, interfaces):
with _session_for_write() as session:
try:
try:
session.begin()
except sa.exc.InvalidRequestError:
# When running unit tests, the transaction reports as
# already started, where as in service startup this is
# the first write op.
pass
for iface in interfaces:
conductor_hw_iface = models.ConductorHardwareInterfaces()
conductor_hw_iface['conductor_id'] = conductor_id
@ -1450,22 +1386,22 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def unregister_conductor_hardware_interfaces(self, conductor_id):
with _session_for_write():
query = (model_query(models.ConductorHardwareInterfaces)
with _session_for_write() as session:
query = (session.query(models.ConductorHardwareInterfaces)
.filter_by(conductor_id=conductor_id))
query.delete()
@oslo_db_api.retry_on_deadlock
def touch_node_provisioning(self, node_id):
with _session_for_write():
query = model_query(models.Node)
with _session_for_write() as session:
query = session.query(models.Node)
query = add_identity_filter(query, node_id)
count = query.update({'provision_updated_at': timeutils.utcnow()})
if count == 0:
raise exception.NodeNotFound(node=node_id)
def _check_node_exists(self, node_id):
if not model_query(models.Node).where(
def _check_node_exists(self, session, node_id):
if not session.query(models.Node).where(
models.Node.id == node_id).scalar():
raise exception.NodeNotFound(node=node_id)
@ -1485,24 +1421,25 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def unset_node_tags(self, node_id):
self._check_node_exists(node_id)
with _session_for_write():
model_query(models.NodeTag).filter_by(node_id=node_id).delete()
with _session_for_write() as session:
self._check_node_exists(session, node_id)
session.query(models.NodeTag).filter_by(node_id=node_id).delete()
def get_node_tags_by_node_id(self, node_id):
self._check_node_exists(node_id)
result = (model_query(models.NodeTag)
.filter_by(node_id=node_id)
.all())
with _session_for_read() as session:
self._check_node_exists(session, node_id)
result = (session.query(models.NodeTag)
.filter_by(node_id=node_id)
.all())
return result
@oslo_db_api.retry_on_deadlock
def add_node_tag(self, node_id, tag):
node_tag = models.NodeTag(tag=tag, node_id=node_id)
self._check_node_exists(node_id)
try:
with _session_for_write() as session:
node_tag = models.NodeTag(tag=tag, node_id=node_id)
self._check_node_exists(session, node_id)
session.add(node_tag)
session.flush()
except db_exc.DBDuplicateEntry:
@ -1513,18 +1450,20 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def delete_node_tag(self, node_id, tag):
self._check_node_exists(node_id)
with _session_for_write():
result = model_query(models.NodeTag).filter_by(
with _session_for_write() as session:
self._check_node_exists(session, node_id)
result = session.query(models.NodeTag).filter_by(
node_id=node_id, tag=tag).delete()
if not result:
raise exception.NodeTagNotFound(node_id=node_id, tag=tag)
if not result:
raise exception.NodeTagNotFound(node_id=node_id, tag=tag)
def node_tag_exists(self, node_id, tag):
self._check_node_exists(node_id)
q = model_query(models.NodeTag).filter_by(node_id=node_id, tag=tag)
return model_query(q.exists()).scalar()
with _session_for_read() as session:
self._check_node_exists(session, node_id)
q = session.query(models.NodeTag).filter_by(
node_id=node_id, tag=tag)
return session.query(q.exists()).scalar()
def get_node_by_port_addresses(self, addresses):
q = _get_node_select()
@ -1549,7 +1488,7 @@ class Connection(api.Connection):
def get_volume_connector_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
query = model_query(models.VolumeConnector)
query = sa.select(models.VolumeConnector)
if project:
query = add_volume_conn_filter_by_node_project(query, project)
return _paginate_query(models.VolumeConnector, limit, marker,
@ -1573,7 +1512,7 @@ class Connection(api.Connection):
def get_volume_connectors_by_node_id(self, node_id, limit=None,
marker=None, sort_key=None,
sort_dir=None, project=None):
query = model_query(models.VolumeConnector).where(
query = sa.select(models.VolumeConnector).where(
models.VolumeConnector.node_id == node_id)
if project:
add_volume_conn_filter_by_node_project(query, project)
@ -1608,7 +1547,7 @@ class Connection(api.Connection):
try:
with _session_for_write() as session:
query = model_query(models.VolumeConnector)
query = session.query(models.VolumeConnector)
query = add_identity_filter(query, ident)
ref = query.one()
orig_type = ref['type']
@ -1626,8 +1565,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_volume_connector(self, ident):
with _session_for_write():
query = model_query(models.VolumeConnector)
with _session_for_write() as session:
query = session.query(models.VolumeConnector)
query = add_identity_filter(query, ident)
count = query.delete()
if count == 0:
@ -1635,7 +1574,7 @@ class Connection(api.Connection):
def get_volume_target_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
query = model_query(models.VolumeTarget)
query = sa.select(models.VolumeTarget)
if project:
query = add_volume_target_filter_by_node_project(query, project)
return _paginate_query(models.VolumeTarget, limit, marker,
@ -1659,7 +1598,8 @@ class Connection(api.Connection):
def get_volume_targets_by_node_id(self, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None,
project=None):
query = model_query(models.VolumeTarget).filter_by(node_id=node_id)
query = sa.select(models.VolumeTarget).where(
models.VolumeTarget.node_id == node_id)
if project:
add_volume_target_filter_by_node_project(query, project)
return _paginate_query(models.VolumeTarget, limit, marker, sort_key,
@ -1668,7 +1608,7 @@ class Connection(api.Connection):
def get_volume_targets_by_volume_id(self, volume_id, limit=None,
marker=None, sort_key=None,
sort_dir=None, project=None):
query = model_query(models.VolumeTarget).where(
query = sa.select(models.VolumeTarget).where(
models.VolumeTarget.volume_id == volume_id)
if project:
query = add_volume_target_filter_by_node_project(query, project)
@ -1702,7 +1642,7 @@ class Connection(api.Connection):
try:
with _session_for_write() as session:
query = model_query(models.VolumeTarget)
query = session.query(models.VolumeTarget)
query = add_identity_filter(query, ident)
ref = query.one()
orig_boot_index = ref['boot_index']
@ -1717,8 +1657,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_volume_target(self, ident):
with _session_for_write():
query = model_query(models.VolumeTarget)
with _session_for_write() as session:
query = session.query(models.VolumeTarget)
query = add_identity_filter(query, ident)
count = query.delete()
if count == 0:
@ -1850,10 +1790,11 @@ class Connection(api.Connection):
all_models.append(models.Node)
sql_models = [model for model in all_models
if model.__name__ in mapping]
for model in sql_models:
version = mapping[model.__name__][0]
query = model_query(model).filter(model.version != version)
total_to_migrate += query.count()
with _session_for_read() as session:
for model in sql_models:
version = mapping[model.__name__][0]
query = session.query(model).filter(model.version != version)
total_to_migrate += query.count()
if not total_to_migrate:
return total_to_migrate, 0
@ -1877,8 +1818,8 @@ class Connection(api.Connection):
for model in sql_models:
version = mapping[model.__name__][0]
num_migrated = 0
with _session_for_write():
query = model_query(model).filter(model.version != version)
with _session_for_write() as session:
query = session.query(model).filter(model.version != version)
# NOTE(rloo) Caution here; after doing query.count(), it is
# possible that the value is different in the
# next invocation of the query.
@ -1890,14 +1831,14 @@ class Connection(api.Connection):
for obj in query.slice(0, max_to_migrate):
ids.append(obj['id'])
num_migrated = (
model_query(model).
session.query(model).
filter(sql.and_(model.id.in_(ids),
model.version != version)).
update({model.version: version},
synchronize_session=False))
else:
num_migrated = (
model_query(model).
session.query(model).
filter(model.version != version).
update({model.version: version},
synchronize_session=False))
@ -1948,15 +1889,16 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def unset_node_traits(self, node_id):
self._check_node_exists(node_id)
with _session_for_write():
model_query(models.NodeTrait).filter_by(node_id=node_id).delete()
with _session_for_write() as session:
self._check_node_exists(session, node_id)
session.query(models.NodeTrait).filter_by(node_id=node_id).delete()
def get_node_traits_by_node_id(self, node_id):
self._check_node_exists(node_id)
result = (model_query(models.NodeTrait)
.filter_by(node_id=node_id)
.all())
with _session_for_read() as session:
self._check_node_exists(session, node_id)
result = (session.query(models.NodeTrait)
.filter_by(node_id=node_id)
.all())
return result
@oslo_db_api.retry_on_deadlock
@ -1964,13 +1906,14 @@ class Connection(api.Connection):
node_trait = models.NodeTrait(trait=trait, node_id=node_id,
version=version)
self._check_node_exists(node_id)
try:
with _session_for_write() as session:
self._check_node_exists(session, node_id)
session.add(node_trait)
session.flush()
num_traits = (model_query(models.NodeTrait)
num_traits = (session.query(models.NodeTrait)
.filter_by(node_id=node_id).count())
self._verify_max_traits_per_node(node_id, num_traits)
except db_exc.DBDuplicateEntry:
@ -1981,25 +1924,26 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def delete_node_trait(self, node_id, trait):
self._check_node_exists(node_id)
with _session_for_write():
result = model_query(models.NodeTrait).filter_by(
with _session_for_write() as session:
self._check_node_exists(session, node_id)
result = session.query(models.NodeTrait).filter_by(
node_id=node_id, trait=trait).delete()
if not result:
raise exception.NodeTraitNotFound(node_id=node_id, trait=trait)
if not result:
raise exception.NodeTraitNotFound(node_id=node_id, trait=trait)
def node_trait_exists(self, node_id, trait):
self._check_node_exists(node_id)
q = model_query(
models.NodeTrait).filter_by(node_id=node_id, trait=trait)
return model_query(q.exists()).scalar()
with _session_for_read() as session:
self._check_node_exists(session, node_id)
q = session.query(
models.NodeTrait).filter_by(node_id=node_id, trait=trait)
return session.query(q.exists()).scalar()
@oslo_db_api.retry_on_deadlock
def create_bios_setting_list(self, node_id, settings, version):
self._check_node_exists(node_id)
bios_settings = []
with _session_for_write() as session:
self._check_node_exists(session, node_id)
try:
for setting in settings:
bios_setting = models.BIOSSetting(
@ -2026,12 +1970,12 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def update_bios_setting_list(self, node_id, settings, version):
self._check_node_exists(node_id)
bios_settings = []
with _session_for_write() as session:
self._check_node_exists(session, node_id)
try:
for setting in settings:
query = model_query(models.BIOSSetting).filter_by(
query = session.query(models.BIOSSetting).filter_by(
node_id=node_id, name=setting['name'])
ref = query.one()
ref.update({'value': setting['value'],
@ -2057,11 +2001,11 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def delete_bios_setting_list(self, node_id, names):
self._check_node_exists(node_id)
missing_bios_settings = []
with _session_for_write():
with _session_for_write() as session:
self._check_node_exists(session, node_id)
for name in names:
count = model_query(models.BIOSSetting).filter_by(
count = session.query(models.BIOSSetting).filter_by(
node_id=node_id, name=name).delete()
if count == 0:
missing_bios_settings.append(name)
@ -2070,20 +2014,22 @@ class Connection(api.Connection):
node=node_id, names=','.join(missing_bios_settings))
def get_bios_setting(self, node_id, name):
self._check_node_exists(node_id)
query = model_query(models.BIOSSetting).filter_by(
node_id=node_id, name=name)
try:
ref = query.one()
except NoResultFound:
raise exception.BIOSSettingNotFound(node=node_id, name=name)
with _session_for_read() as session:
self._check_node_exists(session, node_id)
query = session.query(models.BIOSSetting).filter_by(
node_id=node_id, name=name)
try:
ref = query.one()
except NoResultFound:
raise exception.BIOSSettingNotFound(node=node_id, name=name)
return ref
def get_bios_setting_list(self, node_id):
self._check_node_exists(node_id)
result = (model_query(models.BIOSSetting)
.filter_by(node_id=node_id)
.all())
with _session_for_read() as session:
self._check_node_exists(session, node_id)
result = (session.query(models.BIOSSetting)
.filter_by(node_id=node_id)
.all())
return result
def get_allocation_by_id(self, allocation_id):
@ -2093,11 +2039,13 @@ class Connection(api.Connection):
:returns: An allocation.
:raises: AllocationNotFound
"""
query = model_query(models.Allocation).filter_by(id=allocation_id)
try:
return query.one()
except NoResultFound:
raise exception.AllocationNotFound(allocation=allocation_id)
with _session_for_read() as session:
query = session.query(models.Allocation).filter_by(
id=allocation_id)
try:
return query.one()
except NoResultFound:
raise exception.AllocationNotFound(allocation=allocation_id)
def get_allocation_by_uuid(self, allocation_uuid):
"""Return an allocation representation.
@ -2106,11 +2054,13 @@ class Connection(api.Connection):
:returns: An allocation.
:raises: AllocationNotFound
"""
query = model_query(models.Allocation).filter_by(uuid=allocation_uuid)
try:
return query.one()
except NoResultFound:
raise exception.AllocationNotFound(allocation=allocation_uuid)
with _session_for_read() as session:
query = session.query(models.Allocation).filter_by(
uuid=allocation_uuid)
try:
return query.one()
except NoResultFound:
raise exception.AllocationNotFound(allocation=allocation_uuid)
def get_allocation_by_name(self, name):
"""Return an allocation representation.
@ -2119,11 +2069,12 @@ class Connection(api.Connection):
:returns: An allocation.
:raises: AllocationNotFound
"""
query = model_query(models.Allocation).filter_by(name=name)
try:
return query.one()
except NoResultFound:
raise exception.AllocationNotFound(allocation=name)
with _session_for_read() as session:
query = session.query(models.Allocation).filter_by(name=name)
try:
return query.one()
except NoResultFound:
raise exception.AllocationNotFound(allocation=name)
def get_allocation_list(self, filters=None, limit=None, marker=None,
sort_key=None, sort_dir=None):
@ -2142,8 +2093,9 @@ class Connection(api.Connection):
(asc, desc)
:returns: A list of allocations.
"""
query = self._add_allocations_filters(model_query(models.Allocation),
filters)
query = self._add_allocations_filters(
sa.select(models.Allocation),
filters)
return _paginate_query(models.Allocation, limit, marker,
sort_key, sort_dir, query)
@ -2200,14 +2152,14 @@ class Connection(api.Connection):
with _session_for_write() as session:
try:
query = model_query(models.Allocation, session=session)
query = session.query(models.Allocation)
query = add_identity_filter(query, allocation_id)
ref = query.one()
ref.update(values)
instance_uuid = ref.uuid
if values.get('node_id') and update_node:
node = model_query(models.Node, session=session).filter_by(
node = session.query(models.Node).filter_by(
id=ref.node_id).with_for_update().one()
node_uuid = node.uuid
if node.instance_uuid and node.instance_uuid != ref.uuid:
@ -2252,7 +2204,7 @@ class Connection(api.Connection):
"""
with _session_for_write() as session:
try:
query = model_query(models.Allocation, session=session)
query = session.query(models.Allocation)
query = add_identity_filter(query, allocation_id)
# NOTE(dtantsur): the FOR UPDATE clause locks the allocation
ref = query.with_for_update().one()
@ -2275,7 +2227,7 @@ class Connection(api.Connection):
:raises: AllocationNotFound
"""
with _session_for_write() as session:
query = model_query(models.Allocation)
query = session.query(models.Allocation)
query = add_identity_filter(query, allocation_id)
try:
@ -2285,7 +2237,7 @@ class Connection(api.Connection):
allocation_id = ref['id']
node_query = model_query(models.Node, session=session).filter_by(
node_query = session.query(models.Node).filter_by(
allocation_id=allocation_id)
node_query.update({'allocation_id': None, 'instance_uuid': None})
@ -2338,7 +2290,7 @@ class Connection(api.Connection):
return step.interface, step.step, sortable_args, step.priority
# List all existing steps for the template.
current_steps = (model_query(models.DeployTemplateStep)
current_steps = (session.query(models.DeployTemplateStep)
.filter_by(deploy_template_id=template_id))
# List the new steps for the template.
@ -2362,7 +2314,7 @@ class Connection(api.Connection):
# Delete and create steps in bulk as necessary.
if step_ids_to_delete:
((model_query(models.DeployTemplateStep)
((session.query(models.DeployTemplateStep)
.filter(models.DeployTemplateStep.id.in_(step_ids_to_delete)))
.delete(synchronize_session=False))
if steps_to_create:
@ -2378,7 +2330,7 @@ class Connection(api.Connection):
with _session_for_write() as session:
# NOTE(mgoddard): Don't issue a joined query for the update as
# this does not work with PostgreSQL.
query = model_query(models.DeployTemplate)
query = session.query(models.DeployTemplate)
query = add_identity_filter(query, template_id)
ref = query.with_for_update().one()
# First, update non-step columns.
@ -2406,10 +2358,10 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_deploy_template(self, template_id):
with _session_for_write():
model_query(models.DeployTemplateStep).filter_by(
with _session_for_write() as session:
session.query(models.DeployTemplateStep).filter_by(
deploy_template_id=template_id).delete()
count = model_query(models.DeployTemplate).filter_by(
count = session.query(models.DeployTemplate).filter_by(
id=template_id).delete()
if count == 0:
raise exception.DeployTemplateNotFound(template=template_id)
@ -2439,7 +2391,8 @@ class Connection(api.Connection):
def get_deploy_template_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None):
query = _get_deploy_template_query_with_steps()
query = model_query(models.DeployTemplate).options(
selectinload(models.DeployTemplate.steps))
return _paginate_query(models.DeployTemplate, limit, marker,
sort_key, sort_dir, query)
@ -2469,8 +2422,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_node_history_by_uuid(self, history_uuid):
with _session_for_write():
query = model_query(models.NodeHistory).filter_by(
with _session_for_write() as session:
query = session.query(models.NodeHistory).filter_by(
uuid=history_uuid)
count = query.delete()
if count == 0:

View File

@ -18,9 +18,6 @@
import datetime
from unittest import mock
import oslo_db
from oslo_db import exception as db_exc
from oslo_db import sqlalchemy
from oslo_utils import timeutils
from ironic.common import exception
@ -158,18 +155,6 @@ class DbConductorTestCase(base.DbTestCase):
c = self.dbapi.get_conductor(c.hostname)
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
@mock.patch.object(oslo_db.api.time, 'sleep', autospec=True)
@mock.patch.object(sqlalchemy.orm.Query, 'update', autospec=True)
def test_touch_conductor_deadlock(self, mock_update, mock_sleep):
mock_sleep.return_value = None
mock_update.side_effect = [db_exc.DBDeadlock(), None]
c = self._create_test_cdr()
self.dbapi.touch_conductor(c.hostname)
self.assertEqual(2, mock_update.call_count)
# Count that it was called, but not the number of times
# as this is *actually* time.sleep via import from oslo_db.api
self.assertTrue(mock_sleep.called)
def test_touch_conductor_not_found(self):
# A conductor's heartbeat will not create a new record,
# it will only update existing ones