Merge "[server side] Fix race issue for port forwarding plugin"

This commit is contained in:
Zuul 2018-08-01 03:27:44 +00:00 committed by Gerrit Code Review
commit 7d83f9d288
5 changed files with 290 additions and 28 deletions

View File

@ -56,3 +56,4 @@ class PortForwarding(model_base.BASEV2, model_base.HasId):
lazy='subquery', uselist=True,
cascade='delete')
)
revises_on_change = ('floating_ip', 'port',)

View File

@ -23,3 +23,7 @@ class PortForwardingNotFound(n_exc.NotFound):
class PortForwardingNotSupportFilterField(n_exc.BadRequest):
message = _("Port Forwarding filter %(filter)s is not supported.")
class FipInUseByPortForwarding(n_exc.InUse):
message = _("Floating IP %(id)s in use by Port Forwarding resources.")

View File

@ -19,7 +19,9 @@ import functools
import netaddr
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.api.definitions import l3
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as lib_consts
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as lib_exc
@ -27,6 +29,7 @@ from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_log import log as logging
from neutron._i18n import _
from neutron.api.rpc.callbacks import events as rpc_events
@ -41,6 +44,8 @@ from neutron.objects import port_forwarding as pf
from neutron.objects import router
from neutron.services.portforwarding.common import exceptions as pf_exc
LOG = logging.getLogger(__name__)
def make_result_with_fields(f):
@functools.wraps(f)
@ -97,6 +102,116 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
result_dict[apidef.COLLECTION_NAME] = port_forwarding_result
return result_dict
@registry.receives(resources.FLOATING_IP, [events.PRECOMMIT_UPDATE,
events.PRECOMMIT_DELETE])
def _check_floatingip_request(self, resource, event, trigger, context,
**kwargs):
# We only support the "free" floatingip to be associated with
# port forwarding resources. And in the PUT request of floatingip,
# the request body must contain a "port_id" field which is not
# allowed in port forwarding functionality.
floatingip_id = None
if event == events.PRECOMMIT_UPDATE:
fip_db = kwargs.get('floatingip_db')
floatingip_id = fip_db.id
# Here the key-value must contain a floatingip param, and the value
# must a dict with key 'floatingip'.
if not kwargs['floatingip']['floatingip'].get('port_id'):
# Only care about the associate floatingip cases.
# The port_id field is a must-option. But if a floatingip
# disassociate a internal port, the port_id should be null.
LOG.debug('Skip check for floatingip %s, as the update '
'request does not contain port_id.', floatingip_id)
return
elif event == events.PRECOMMIT_DELETE:
floatingip_id = kwargs.get('port').get('device_id')
if not floatingip_id:
return
exist_pf_resources = pf.PortForwarding.get_objects(
context, floatingip_id=floatingip_id)
if exist_pf_resources:
raise pf_exc.FipInUseByPortForwarding(id=floatingip_id)
@registry.receives(resources.PORT, [events.AFTER_UPDATE,
events.PRECOMMIT_DELETE])
@db_api.retry_if_session_inactive()
def _process_port_request(self, resource, event, trigger, context,
**kwargs):
# Deleting floatingip will receive port resource with precommit_delete
# event, so just return, then check the request in
# _check_floatingip_request callback.
if kwargs['port']['device_owner'].startswith(
lib_consts.DEVICE_OWNER_FLOATINGIP):
return
# This block is used for checking if there are some fixed ips updates.
# Whatever the event is AFTER_UPDATE/PRECOMMIT_DELETE,
# we will use the update_ip_set for checking if the possible associated
# port forwarding resources need to be deleted for port's AFTER_UPDATE
# event. Or get all affected ip addresses for port's PRECOMMIT_DELETE
# event.
port_id = kwargs['port']['id']
update_fixed_ips = kwargs['port']['fixed_ips']
update_ip_set = set()
for update_fixed_ip in update_fixed_ips:
if (netaddr.IPNetwork(update_fixed_ip.get('ip_address')).version ==
lib_consts.IP_VERSION_4):
update_ip_set.add(update_fixed_ip.get('ip_address'))
if not update_ip_set:
return
# If the port owner wants to update or delete port, we must elevate the
# context to check if the floatingip or port forwarding resources
# are owned by other tenants.
if not context.is_admin:
context = context.elevated()
# If the logic arrives here, that means we have got update_ip_set and
# its value is not None. So we need to get all port forwarding
# resources based on the request port_id for preparing the next
# process, such as deleting them.
pf_resources = pf.PortForwarding.get_objects(
context, internal_port_id=port_id)
if not pf_resources:
return
# If the logic arrives here, that means we have got pf_resources and
# its value is not None either. Then we collect all ip addresses
# which are used by port forwarding resources to generate used_ip_set,
# and we default to set remove_ip_set as used_ip_set which means we
# want to delete all port forwarding resources when event is
# PRECOMMIT_DELETE. And when event is AFTER_UPDATE, we get the
# different part.
used_ip_set = set()
for pf_resource in pf_resources:
used_ip_set.add(str(pf_resource.internal_ip_address))
remove_ip_set = used_ip_set
if event == events.AFTER_UPDATE:
remove_ip_set = used_ip_set - update_ip_set
if not remove_ip_set:
return
# Here, we get the remove_ip_set, the following block will delete the
# port forwarding resources based on remove_ip_set. Just need to note
# here, if event is AFTER_UPDATE, and remove_ip_set is empty, the
# following block won't be processed.
remove_port_forwarding_list = []
with db_api.context_manager.writer.using(context):
for pf_resource in pf_resources:
if str(pf_resource.internal_ip_address) in remove_ip_set:
pf_objs = pf.PortForwarding.get_objects(
context, floatingip_id=pf_resource.floatingip_id)
if len(pf_objs) == 1 and pf_objs[0].id == pf_resource.id:
fip_obj = router.FloatingIP.get_object(
context, id=pf_resource.floatingip_id)
fip_obj.update_fields({'router_id': None})
fip_obj.update()
pf_resource.delete()
remove_port_forwarding_list.append(pf_resource)
self.push_api.push(context, remove_port_forwarding_list,
rpc_events.DELETED)
def _get_internal_ip_subnet(self, request_ip, fixed_ips):
request_ip = netaddr.IPNetwork(request_ip)
for fixed_ip in fixed_ips:
@ -166,34 +281,45 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
port_forwarding):
port_forwarding = port_forwarding.get(apidef.RESOURCE_NAME)
port_forwarding['floatingip_id'] = floatingip_id
pf_obj = pf.PortForwarding(context, **port_forwarding)
try:
with db_api.context_manager.writer.using(context):
fip_obj = self._get_fip_obj(context, floatingip_id)
with db_api.context_manager.writer.using(context):
fip_obj = self._get_fip_obj(context, floatingip_id)
if fip_obj.fixed_port_id:
raise lib_l3_exc.FloatingIPPortAlreadyAssociated(
port_id=port_forwarding['internal_port_id'],
fip_id=fip_obj.id,
floating_ip_address=fip_obj.floating_ip_address,
fixed_ip=str(port_forwarding['internal_ip_address']),
net_id=fip_obj.floating_network_id)
router_id = self._find_a_router_for_fip_port_forwarding(
context, port_forwarding, fip_obj)
pf_obj = pf.PortForwarding(context, **port_forwarding)
router_id = self._find_a_router_for_fip_port_forwarding(
context, port_forwarding, fip_obj)
# If this func does not raise an exception, means the
# router_id matched.
# case1: fip_obj.router_id = None
# case2: fip_obj.router_id is the same with we selected.
self._check_router_match(context, fip_obj,
router_id, port_forwarding)
if not fip_obj.router_id:
fip_obj.router_id = router_id
fip_obj.update()
# If this func does not raise an exception, means the
# router_id matched.
# case1: fip_obj.router_id = None
# case2: fip_obj.router_id is the same with we selected.
self._check_router_match(context, fip_obj,
router_id, port_forwarding)
if not fip_obj.router_id:
values = {'router_id': router_id, 'fixed_port_id': None}
router.FloatingIP.update_objects(
context, values, id=floatingip_id)
try:
pf_obj.create()
except obj_exc.NeutronDbObjectDuplicateEntry:
(__, conflict_params) = self._find_existing_port_forwarding(
context, floatingip_id, port_forwarding)
message = _("A duplicate port forwarding entry with same "
"attributes already exists, conflicting values "
"are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
return pf_obj
except obj_exc.NeutronDbObjectDuplicateEntry:
(__,
conflict_params) = self._find_existing_port_forwarding(
context, floatingip_id, port_forwarding)
message = _("A duplicate port forwarding entry with same "
"attributes already exists, conflicting "
"values are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
return pf_obj
@db_base_plugin_common.convert_result_to_dict
def update_floatingip_port_forwarding(self, context, id, floatingip_id,

View File

@ -10,10 +10,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import mock
import netaddr
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.callbacks import exceptions as c_exc
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from oslo_utils import uuidutils
from six.moves import queue
from neutron.services.portforwarding.common import exceptions as pf_exc
from neutron.services.portforwarding import pf_plugin
@ -40,6 +46,20 @@ class PortForwardingTestCaseBase(ml2_test_base.ML2TestFramework):
def _get_floatingip(self, floatingip_id):
return self.l3_plugin.get_floatingip(self.context, floatingip_id)
def _update_floatingip(self, fip_id, update_info):
return self.l3_plugin.update_floatingip(
self.context, fip_id, {"floatingip": update_info})
def _delete_floatingip(self, fip_id):
return self.l3_plugin.delete_floatingip(self.context, fip_id)
def _update_port(self, port_id, update_info):
return self.core_plugin.update_port(
self.context, port_id, {'port': update_info})
def _delete_port(self, port_id):
return self.core_plugin.delete_port(self.context, port_id)
def _add_router_interface(self, router_id, subnet_id):
interface_info = {"subnet_id": subnet_id}
self.l3_plugin.add_router_interface(
@ -260,3 +280,108 @@ class PortForwardingTestCase(PortForwardingTestCaseBase):
self.assertRaises(pf_exc.PortForwardingNotFound,
self.pf_plugin.delete_floatingip_port_forwarding,
self.context, res['id'], uuidutils.generate_uuid())
def _simulate_concurrent_requests_process_and_raise(
self, funcs, args_list):
class SimpleThread(threading.Thread):
def __init__(self, q):
super(SimpleThread, self).__init__()
self.q = q
self.exception = None
def run(self):
try:
while not self.q.empty():
item = None
try:
item = self.q.get(False)
func, func_args = item[0], item[1]
func(*func_args)
except queue.Empty:
pass
finally:
if item:
self.q.task_done()
except Exception as e:
self.exception = e
def get_exception(self):
return self.exception
q = queue.Queue()
for func, func_args in zip(funcs, args_list):
q.put_nowait((func, func_args))
threads = []
for _ in range(len(funcs)):
t = SimpleThread(q)
threads.append(t)
t.start()
q.join()
for t in threads:
e = t.get_exception()
if e:
raise e
def test_concurrent_create_port_forwarding_delete_fip(self):
func1 = self.pf_plugin.create_floatingip_port_forwarding
func2 = self._delete_floatingip
funcs = [func1, func2]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.fip['id'],)]
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
port_forwardings = self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id'], fields=['id'])
self.pf_plugin.delete_floatingip_port_forwarding(
self.context, port_forwardings[0][apidef.ID],
floatingip_id=self.fip['id'])
funcs.reverse()
args_list.reverse()
self.assertRaises(lib_l3_exc.FloatingIPNotFound,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
def test_concurrent_create_port_forwarding_update_fip(self):
newport = self._create_port(self.fmt, self.net['id']).json['port']
func1 = self.pf_plugin.create_floatingip_port_forwarding
func2 = self._update_floatingip
funcs = [func1, func2]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.fip['id'], {'port_id': newport['id']})]
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
funcs.reverse()
args_list.reverse()
self.assertRaises(c_exc.CallbackFailure,
self._simulate_concurrent_requests_process_and_raise,
funcs, args_list)
def test_concurrent_create_port_forwarding_update_port(self):
new_ip = str(
netaddr.IPAddress(self.port['fixed_ips'][0]['ip_address']) + 2)
funcs = [self.pf_plugin.create_floatingip_port_forwarding,
self._update_port]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.port['id'], {
'fixed_ips': [{'subnet_id': self.subnet['id'],
'ip_address': new_ip}]})]
self._simulate_concurrent_requests_process_and_raise(funcs, args_list)
self.assertEqual([], self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id']))
def test_concurrent_create_port_forwarding_delete_port(self):
funcs = [self.pf_plugin.create_floatingip_port_forwarding,
self._delete_port]
args_list = [(self.context, self.fip['id'], self.port_forwarding),
(self.port['id'],)]
self._simulate_concurrent_requests_process_and_raise(funcs, args_list)
self.assertEqual([], self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id']))

View File

@ -31,6 +31,7 @@ from neutron.api.rpc.handlers import resources_rpc
from neutron.db import db_base_plugin_v2
from neutron.db import l3_db
from neutron import manager
from neutron.objects import base as obj_base
from neutron.objects import port_forwarding
from neutron.objects import router
from neutron.services.portforwarding.common import exceptions as pf_exc
@ -188,6 +189,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
self.pf_plugin.update_floatingip_port_forwarding,
self.ctxt, 'pf_id', **pf_input)
@mock.patch.object(obj_base.NeutronDbObject, 'update_objects')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(pf_plugin.PortForwardingPlugin, '_check_router_match')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
@ -197,7 +199,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
@mock.patch('neutron.objects.port_forwarding.PortForwarding')
def test_create_floatingip_port_forwarding(
self, mock_port_forwarding, mock_fip_get_object, mock_find_router,
mock_check_router_match, mock_push_api):
mock_check_router_match, mock_push_api, mock_update_objects):
# Update fip
pf_input = {
'port_forwarding':
@ -210,11 +212,12 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
mock_port_forwarding.return_value = pf_obj
mock_fip_get_object.return_value = fip_obj
fip_obj.router_id = ''
fip_obj.fixed_port_id = ''
self.pf_plugin.create_floatingip_port_forwarding(
self.ctxt, **pf_input)
mock_port_forwarding.assert_called_once_with(
self.ctxt, **pf_input['port_forwarding']['port_forwarding'])
self.assertTrue(fip_obj.update.called)
self.assertTrue(mock_update_objects.called)
self.assertTrue(pf_obj.create.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
@ -223,15 +226,17 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
pf_obj.reset_mock()
fip_obj.reset_mock()
mock_port_forwarding.reset_mock()
mock_update_objects.reset_mock()
mock_push_api.reset_mock()
mock_port_forwarding.return_value = pf_obj
fip_obj.router_id = 'router_id'
fip_obj.fixed_port_id = ''
self.pf_plugin.create_floatingip_port_forwarding(
self.ctxt, **pf_input)
mock_port_forwarding.assert_called_once_with(
self.ctxt, **pf_input['port_forwarding']['port_forwarding'])
self.assertTrue(pf_obj.create.called)
self.assertFalse(fip_obj.update.called)
self.assertFalse(mock_update_objects.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
@ -256,6 +261,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
fip_obj = mock.Mock()
mock_port_forwarding.return_value = pf_obj
mock_fip_get_object.return_value = fip_obj
fip_obj.fixed_port_id = ''
pf_obj.create.side_effect = obj_exc.NeutronDbObjectDuplicateEntry(
mock.Mock(), mock.Mock())