Catch DBDuplicateEntry errors in RBAC code

This catches duplicate entry errors in the RBAC code
and converts them into an an appropriate exception so
clients get an HTTP conflict instead of an internal
server error.

Conflicts:
	neutron/db/rbac_db_mixin.py

Change-Id: I957ade2356ae9cb5bbb7e2322b4dcb37706665cf
Closes-Bug: #1551473
(cherry picked from commit 9712e364fc)
This commit is contained in:
Kevin Benton 2016-02-29 15:35:45 -08:00 committed by Ihar Hrachyshka
parent 4b1d8dca9b
commit 4848c717c3
3 changed files with 23 additions and 6 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db import exception as db_exc
from sqlalchemy.orm import exc
from neutron.callbacks import events
@ -43,12 +44,15 @@ class RbacPluginMixin(common_db_mixin.CommonDbMixin):
raise n_exc.InvalidInput(error_message=e)
dbmodel = models.get_type_model_map()[e['object_type']]
tenant_id = self._get_tenant_id_for_create(context, e)
with context.session.begin(subtransactions=True):
db_entry = dbmodel(object_id=e['object_id'],
target_tenant=e['target_tenant'],
action=e['action'],
tenant_id=tenant_id)
context.session.add(db_entry)
try:
with context.session.begin(subtransactions=True):
db_entry = dbmodel(object_id=e['object_id'],
target_tenant=e['target_tenant'],
action=e['action'],
tenant_id=tenant_id)
context.session.add(db_entry)
except db_exc.DBDuplicateEntry:
raise ext_rbac.DuplicateRbacPolicy()
return self._make_rbac_policy_dict(db_entry)
def _make_rbac_policy_dict(self, db_entry, fields=None):

View File

@ -32,6 +32,10 @@ class RbacPolicyInUse(n_exc.Conflict):
"because other objects depend on it.\nDetails: %(details)s")
class DuplicateRbacPolicy(n_exc.Conflict):
message = _("An RBAC policy already exists with those values.")
def convert_valid_object_type(otype):
normalized = otype.strip().lower()
if normalized in rbac_db_models.get_type_model_map():

View File

@ -237,6 +237,15 @@ class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
update_res['rbac_policy'].pop('target_tenant')
self.assertEqual(res['policy'], update_res['rbac_policy'])
@test.idempotent_id('86c3529b-1231-40de-803c-affefefef321')
def test_duplicate_policy_error(self):
res = self._make_admin_net_and_subnet_shared_to_tenant_id(
self.client.tenant_id)
with testtools.ExpectedException(lib_exc.Conflict):
self.admin_client.create_rbac_policy(
object_type='network', object_id=res['network']['id'],
action='access_as_shared', target_tenant=self.client.tenant_id)
@test.attr(type='smoke')
@test.idempotent_id('86c3529b-1231-40de-803c-afffffff3fff')
def test_port_presence_prevents_network_rbac_policy_deletion(self):