Execute DB API methods in a single transaction

Many DB API methods do a few queries to a DB (e. g.  SELECT
and then UPDATE, or SELECT and then DELETE, etc).  By default,
a Session instance is used with autocommit=True, which means,
that each query to a DB is done in a separate transaction. This
is error-prone (as it may lead to race conditions or returning
of unexpected results) and makes rollbacks harder (if one of
transactions fail, the previous ones can not be rolled back).

This patch ensures that all DB API methods, which do a few
queries (or call private DB API methods), are executed inside
a single transaction.

Blueprint: db-session-cleanup

Change-Id: Ie6510becffdeb78048fe4a09511ab326627d3412
This commit is contained in:
Roman Podolyaka
2013-07-22 18:27:51 +03:00
parent 768ae7c626
commit 2f7d2dce0e

View File

@@ -1128,18 +1128,20 @@ def volume_get(context, volume_id):
@require_admin_context
def volume_get_all(context, marker, limit, sort_key, sort_dir):
query = _volume_get_query(context)
session = get_session()
with session.begin():
query = _volume_get_query(context, session=session)
marker_volume = None
if marker is not None:
marker_volume = _volume_get(context, marker)
marker_volume = None
if marker is not None:
marker_volume = _volume_get(context, marker, session=session)
query = sqlalchemyutils.paginate_query(query, models.Volume, limit,
[sort_key, 'created_at', 'id'],
marker=marker_volume,
sort_dir=sort_dir)
query = sqlalchemyutils.paginate_query(query, models.Volume, limit,
[sort_key, 'created_at', 'id'],
marker=marker_volume,
sort_dir=sort_dir)
return query.all()
return query.all()
@require_admin_context
@@ -1164,19 +1166,22 @@ def volume_get_all_by_instance_uuid(context, instance_uuid):
@require_context
def volume_get_all_by_project(context, project_id, marker, limit, sort_key,
sort_dir):
authorize_project_context(context, project_id)
query = _volume_get_query(context).filter_by(project_id=project_id)
session = get_session()
with session.begin():
authorize_project_context(context, project_id)
query = _volume_get_query(context, session).\
filter_by(project_id=project_id)
marker_volume = None
if marker is not None:
marker_volume = _volume_get(context, marker)
marker_volume = None
if marker is not None:
marker_volume = _volume_get(context, marker, session)
query = sqlalchemyutils.paginate_query(query, models.Volume, limit,
[sort_key, 'created_at', 'id'],
marker=marker_volume,
sort_dir=sort_dir)
query = sqlalchemyutils.paginate_query(query, models.Volume, limit,
[sort_key, 'created_at', 'id'],
marker=marker_volume,
sort_dir=sort_dir)
return query.all()
return query.all()
@require_admin_context
@@ -1194,13 +1199,15 @@ def volume_get_iscsi_target_num(context, volume_id):
@require_context
def volume_update(context, volume_id, values):
session = get_session()
metadata = values.get('metadata')
if metadata is not None:
volume_metadata_update(context,
volume_id,
values.pop('metadata'),
delete=True)
with session.begin():
metadata = values.get('metadata')
if metadata is not None:
_volume_metadata_update(context,
volume_id,
values.pop('metadata'),
delete=True,
session=session)
volume_ref = _volume_get(context, volume_id, session=session)
volume_ref.update(values)
volume_ref.save(session=session)
@@ -1217,8 +1224,8 @@ def _volume_metadata_get_query(context, volume_id, session=None):
@require_context
@require_volume_exists
def volume_metadata_get(context, volume_id):
rows = _volume_metadata_get_query(context, volume_id).all()
def _volume_metadata_get(context, volume_id, session=None):
rows = _volume_metadata_get_query(context, volume_id, session).all()
result = {}
for row in rows:
result[row['key']] = row['value']
@@ -1226,6 +1233,12 @@ def volume_metadata_get(context, volume_id):
return result
@require_context
@require_volume_exists
def volume_metadata_get(context, volume_id):
return _volume_metadata_get(context, volume_id)
@require_context
@require_volume_exists
def volume_metadata_delete(context, volume_id, key):
@@ -1237,7 +1250,6 @@ def volume_metadata_delete(context, volume_id, key):
@require_context
@require_volume_exists
def _volume_metadata_get_item(context, volume_id, key, session=None):
result = _volume_metadata_get_query(context, volume_id, session=session).\
filter_by(key=key).\
@@ -1257,38 +1269,49 @@ def volume_metadata_get_item(context, volume_id, key):
@require_context
@require_volume_exists
def volume_metadata_update(context, volume_id, metadata, delete):
session = get_session()
def _volume_metadata_update(context, volume_id, metadata, delete,
session=None):
if not session:
session = get_session()
# Set existing metadata to deleted if delete argument is True
if delete:
original_metadata = volume_metadata_get(context, volume_id)
for meta_key, meta_value in original_metadata.iteritems():
if meta_key not in metadata:
with session.begin(subtransactions=True):
# Set existing metadata to deleted if delete argument is True
if delete:
original_metadata = _volume_metadata_get(context, volume_id,
session)
for meta_key, meta_value in original_metadata.iteritems():
if meta_key not in metadata:
meta_ref = _volume_metadata_get_item(context, volume_id,
meta_key, session)
meta_ref.update({'deleted': True})
meta_ref.save(session=session)
meta_ref = None
# Now update all existing items with new values, or create new meta
# objects
for meta_key, meta_value in metadata.items():
# update the value whether it exists or not
item = {"value": meta_value}
try:
meta_ref = _volume_metadata_get_item(context, volume_id,
meta_key, session)
meta_ref.update({'deleted': True})
meta_ref.save(session=session)
except exception.VolumeMetadataNotFound as e:
meta_ref = models.VolumeMetadata()
item.update({"key": meta_key, "volume_id": volume_id})
meta_ref = None
meta_ref.update(item)
meta_ref.save(session=session)
# Now update all existing items with new values, or create new meta objects
for meta_key, meta_value in metadata.items():
return metadata
# update the value whether it exists or not
item = {"value": meta_value}
try:
meta_ref = _volume_metadata_get_item(context, volume_id,
meta_key, session)
except exception.VolumeMetadataNotFound as e:
meta_ref = models.VolumeMetadata()
item.update({"key": meta_key, "volume_id": volume_id})
meta_ref.update(item)
meta_ref.save(session=session)
return metadata
@require_context
@require_volume_exists
def volume_metadata_update(context, volume_id, metadata, delete):
return _volume_metadata_update(context, volume_id, metadata, delete)
###################
@@ -1434,8 +1457,8 @@ def _snapshot_metadata_get_query(context, snapshot_id, session=None):
@require_context
@require_snapshot_exists
def snapshot_metadata_get(context, snapshot_id):
rows = _snapshot_metadata_get_query(context, snapshot_id).all()
def _snapshot_metadata_get(context, snapshot_id, session=None):
rows = _snapshot_metadata_get_query(context, snapshot_id, session).all()
result = {}
for row in rows:
result[row['key']] = row['value']
@@ -1443,6 +1466,12 @@ def snapshot_metadata_get(context, snapshot_id):
return result
@require_context
@require_snapshot_exists
def snapshot_metadata_get(context, snapshot_id):
return _snapshot_metadata_get(context, snapshot_id)
@require_context
@require_snapshot_exists
def snapshot_metadata_delete(context, snapshot_id, key):
@@ -1454,7 +1483,6 @@ def snapshot_metadata_delete(context, snapshot_id, key):
@require_context
@require_snapshot_exists
def _snapshot_metadata_get_item(context, snapshot_id, key, session=None):
result = _snapshot_metadata_get_query(context,
snapshot_id,
@@ -1478,36 +1506,39 @@ def snapshot_metadata_get_item(context, snapshot_id, key):
@require_snapshot_exists
def snapshot_metadata_update(context, snapshot_id, metadata, delete):
session = get_session()
with session.begin():
# Set existing metadata to deleted if delete argument is True
if delete:
original_metadata = _snapshot_metadata_get(context, snapshot_id,
session)
for meta_key, meta_value in original_metadata.iteritems():
if meta_key not in metadata:
meta_ref = _snapshot_metadata_get_item(context,
snapshot_id,
meta_key, session)
meta_ref.update({'deleted': True})
meta_ref.save(session=session)
# Set existing metadata to deleted if delete argument is True
if delete:
original_metadata = snapshot_metadata_get(context, snapshot_id)
for meta_key, meta_value in original_metadata.iteritems():
if meta_key not in metadata:
meta_ref = None
# Now update all existing items with new values, or create new meta
# objects
for meta_key, meta_value in metadata.items():
# update the value whether it exists or not
item = {"value": meta_value}
try:
meta_ref = _snapshot_metadata_get_item(context, snapshot_id,
meta_key, session)
meta_ref.update({'deleted': True})
meta_ref.save(session=session)
except exception.SnapshotMetadataNotFound as e:
meta_ref = models.SnapshotMetadata()
item.update({"key": meta_key, "snapshot_id": snapshot_id})
meta_ref = None
meta_ref.update(item)
meta_ref.save(session=session)
# Now update all existing items with new values, or create new meta objects
for meta_key, meta_value in metadata.items():
# update the value whether it exists or not
item = {"value": meta_value}
try:
meta_ref = _snapshot_metadata_get_item(context, snapshot_id,
meta_key, session)
except exception.SnapshotMetadataNotFound as e:
meta_ref = models.SnapshotMetadata()
item.update({"key": meta_key, "snapshot_id": snapshot_id})
meta_ref.update(item)
meta_ref.save(session=session)
return metadata
return metadata
###################
@@ -1540,7 +1571,7 @@ def volume_type_create(context, values):
models.VolumeTypeExtraSpecs)
volume_type_ref = models.VolumeTypes()
volume_type_ref.update(values)
volume_type_ref.save()
volume_type_ref.save(session=session)
except Exception as e:
raise db_exc.DBError(e)
return volume_type_ref
@@ -1615,10 +1646,10 @@ def volume_type_get_by_name(context, name):
@require_admin_context
def volume_type_destroy(context, id):
_volume_type_get(context, id)
session = get_session()
with session.begin():
_volume_type_get(context, id, session)
session.query(models.VolumeTypes).\
filter_by(id=id).\
update({'deleted': True,
@@ -1674,12 +1705,14 @@ def volume_type_extra_specs_get(context, volume_type_id):
@require_context
def volume_type_extra_specs_delete(context, volume_type_id, key):
session = get_session()
_volume_type_extra_specs_get_item(context, volume_type_id, key, session)
_volume_type_extra_specs_query(context, volume_type_id).\
filter_by(key=key).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
with session.begin():
_volume_type_extra_specs_get_item(context, volume_type_id, key,
session)
_volume_type_extra_specs_query(context, volume_type_id, session).\
filter_by(key=key).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
@require_context
@@ -1707,18 +1740,20 @@ def volume_type_extra_specs_get_item(context, volume_type_id, key):
def volume_type_extra_specs_update_or_create(context, volume_type_id,
specs):
session = get_session()
spec_ref = None
for key, value in specs.iteritems():
try:
spec_ref = _volume_type_extra_specs_get_item(
context, volume_type_id, key, session)
except exception.VolumeTypeExtraSpecsNotFound as e:
spec_ref = models.VolumeTypeExtraSpecs()
spec_ref.update({"key": key, "value": value,
"volume_type_id": volume_type_id,
"deleted": False})
spec_ref.save(session=session)
return specs
with session.begin():
spec_ref = None
for key, value in specs.iteritems():
try:
spec_ref = _volume_type_extra_specs_get_item(
context, volume_type_id, key, session)
except exception.VolumeTypeExtraSpecsNotFound as e:
spec_ref = models.VolumeTypeExtraSpecs()
spec_ref.update({"key": key, "value": value,
"volume_type_id": volume_type_id,
"deleted": False})
spec_ref.save(session=session)
return specs
####################
@@ -1798,8 +1833,9 @@ def volume_glance_metadata_copy_to_snapshot(context, snapshot_id, volume_id):
"""
session = get_session()
metadata = _volume_glance_metadata_get(context, volume_id, session=session)
with session.begin():
metadata = _volume_glance_metadata_get(context, volume_id,
session=session)
for meta in metadata:
vol_glance_metadata = models.VolumeGlanceMetadata()
vol_glance_metadata.snapshot_id = snapshot_id
@@ -1821,10 +1857,10 @@ def volume_glance_metadata_copy_from_volume_to_volume(context,
"""
session = get_session()
metadata = _volume_glance_metadata_get(context,
src_volume_id,
session=session)
with session.begin():
metadata = _volume_glance_metadata_get(context,
src_volume_id,
session=session)
for meta in metadata:
vol_glance_metadata = models.VolumeGlanceMetadata()
vol_glance_metadata.volume_id = volume_id
@@ -1844,9 +1880,9 @@ def volume_glance_metadata_copy_to_volume(context, volume_id, snapshot_id):
"""
session = get_session()
metadata = _volume_snapshot_glance_metadata_get(context, snapshot_id,
session=session)
with session.begin():
metadata = _volume_snapshot_glance_metadata_get(context, snapshot_id,
session=session)
for meta in metadata:
vol_glance_metadata = models.VolumeGlanceMetadata()
vol_glance_metadata.volume_id = volume_id