Minimizing lock for port forwarding
Closes-Bug: #1824911 Change-Id: Id02b879cde8d2e6e90850b7cc389edbecd13a479
This commit is contained in:
parent
975143fd08
commit
054fd61223
@ -27,6 +27,7 @@ from neutron.api.rpc.callbacks.consumer import registry
|
|||||||
from neutron.api.rpc.callbacks import events
|
from neutron.api.rpc.callbacks import events
|
||||||
from neutron.api.rpc.callbacks import resources
|
from neutron.api.rpc.callbacks import resources
|
||||||
from neutron.api.rpc.handlers import resources_rpc
|
from neutron.api.rpc.handlers import resources_rpc
|
||||||
|
from neutron.common import coordination
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -53,22 +54,22 @@ class RouterFipPortForwardingMapping(object):
|
|||||||
"""
|
"""
|
||||||
self.router_fip_mapping = collections.defaultdict(set)
|
self.router_fip_mapping = collections.defaultdict(set)
|
||||||
|
|
||||||
|
@lockutils.synchronized('port-forwarding-cache')
|
||||||
def set_port_forwardings(self, port_forwardings):
|
def set_port_forwardings(self, port_forwardings):
|
||||||
for port_forwarding in port_forwardings:
|
for port_forwarding in port_forwardings:
|
||||||
self.set_fip_port_forwarding(port_forwarding.floatingip_id,
|
self._set_fip_port_forwarding(port_forwarding.floatingip_id,
|
||||||
port_forwarding,
|
port_forwarding,
|
||||||
port_forwarding.router_id)
|
port_forwarding.router_id)
|
||||||
|
|
||||||
|
@lockutils.synchronized('port-forwarding-cache')
|
||||||
def update_port_forwardings(self, port_forwardings):
|
def update_port_forwardings(self, port_forwardings):
|
||||||
for port_forwarding in port_forwardings:
|
for port_forwarding in port_forwardings:
|
||||||
self.managed_port_forwardings[port_forwarding.id] = port_forwarding
|
self.managed_port_forwardings[port_forwarding.id] = port_forwarding
|
||||||
|
|
||||||
def get_port_forwarding(self, port_forwarding_id):
|
@lockutils.synchronized('port-forwarding-cache')
|
||||||
return self.managed_port_forwardings.get(port_forwarding_id)
|
|
||||||
|
|
||||||
def del_port_forwardings(self, port_forwardings):
|
def del_port_forwardings(self, port_forwardings):
|
||||||
for port_forwarding in port_forwardings:
|
for port_forwarding in port_forwardings:
|
||||||
if not self.get_port_forwarding(port_forwarding.id):
|
if not self.managed_port_forwardings.get(port_forwarding.id):
|
||||||
continue
|
continue
|
||||||
self.managed_port_forwardings.pop(port_forwarding.id)
|
self.managed_port_forwardings.pop(port_forwarding.id)
|
||||||
self.fip_port_forwarding[port_forwarding.floatingip_id].remove(
|
self.fip_port_forwarding[port_forwarding.floatingip_id].remove(
|
||||||
@ -80,11 +81,12 @@ class RouterFipPortForwardingMapping(object):
|
|||||||
if not self.router_fip_mapping[port_forwarding.router_id]:
|
if not self.router_fip_mapping[port_forwarding.router_id]:
|
||||||
del self.router_fip_mapping[port_forwarding.router_id]
|
del self.router_fip_mapping[port_forwarding.router_id]
|
||||||
|
|
||||||
def set_fip_port_forwarding(self, fip_id, pf, router_id):
|
def _set_fip_port_forwarding(self, fip_id, pf, router_id):
|
||||||
self.router_fip_mapping[router_id].add(fip_id)
|
self.router_fip_mapping[router_id].add(fip_id)
|
||||||
self.fip_port_forwarding[fip_id].add(pf.id)
|
self.fip_port_forwarding[fip_id].add(pf.id)
|
||||||
self.managed_port_forwardings[pf.id] = pf
|
self.managed_port_forwardings[pf.id] = pf
|
||||||
|
|
||||||
|
@lockutils.synchronized('port-forwarding-cache')
|
||||||
def clear_by_fip(self, fip_id, router_id):
|
def clear_by_fip(self, fip_id, router_id):
|
||||||
self.router_fip_mapping[router_id].remove(fip_id)
|
self.router_fip_mapping[router_id].remove(fip_id)
|
||||||
if len(self.router_fip_mapping[router_id]) == 0:
|
if len(self.router_fip_mapping[router_id]) == 0:
|
||||||
@ -93,8 +95,9 @@ class RouterFipPortForwardingMapping(object):
|
|||||||
del self.managed_port_forwardings[pf_id]
|
del self.managed_port_forwardings[pf_id]
|
||||||
del self.fip_port_forwarding[fip_id]
|
del self.fip_port_forwarding[fip_id]
|
||||||
|
|
||||||
|
@lockutils.synchronized('port-forwarding-cache')
|
||||||
def check_port_forwarding_changes(self, new_pf):
|
def check_port_forwarding_changes(self, new_pf):
|
||||||
old_pf = self.get_port_forwarding(new_pf.id)
|
old_pf = self.managed_port_forwardings.get(new_pf.id)
|
||||||
return old_pf != new_pf
|
return old_pf != new_pf
|
||||||
|
|
||||||
|
|
||||||
@ -120,7 +123,6 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
|
|||||||
def consume_api(self, agent_api):
|
def consume_api(self, agent_api):
|
||||||
self.agent_api = agent_api
|
self.agent_api = agent_api
|
||||||
|
|
||||||
@lockutils.synchronized('port-forwarding')
|
|
||||||
def _handle_notification(self, context, resource_type,
|
def _handle_notification(self, context, resource_type,
|
||||||
forwardings, event_type):
|
forwardings, event_type):
|
||||||
for forwarding in forwardings:
|
for forwarding in forwardings:
|
||||||
@ -167,6 +169,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
|
|||||||
iptables_manager.ipv4['nat'].add_chain(chain)
|
iptables_manager.ipv4['nat'].add_chain(chain)
|
||||||
iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag)
|
iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag)
|
||||||
|
|
||||||
|
@coordination.synchronized('port-forwarding-{namespace}')
|
||||||
def _process_create(self, port_forwardings, ri, interface_name, namespace,
|
def _process_create(self, port_forwardings, ri, interface_name, namespace,
|
||||||
iptables_manager):
|
iptables_manager):
|
||||||
if not port_forwardings:
|
if not port_forwardings:
|
||||||
@ -298,6 +301,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
|
|||||||
context, [port_forwarding], ri, interface_name, namespace,
|
context, [port_forwarding], ri, interface_name, namespace,
|
||||||
iptables_manager)
|
iptables_manager)
|
||||||
|
|
||||||
|
@coordination.synchronized('port-forwarding-{namespace}')
|
||||||
def _process_update(self, port_forwardings, iptables_manager,
|
def _process_update(self, port_forwardings, iptables_manager,
|
||||||
interface_name, namespace):
|
interface_name, namespace):
|
||||||
if not port_forwardings:
|
if not port_forwardings:
|
||||||
@ -322,6 +326,7 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
|
|||||||
iptables_manager.apply()
|
iptables_manager.apply()
|
||||||
self._store_local(port_forwardings, events.UPDATED)
|
self._store_local(port_forwardings, events.UPDATED)
|
||||||
|
|
||||||
|
@coordination.synchronized('port-forwarding-{namespace}')
|
||||||
def _process_delete(self, context, port_forwardings, ri, interface_name,
|
def _process_delete(self, context, port_forwardings, ri, interface_name,
|
||||||
namespace, iptables_manager):
|
namespace, iptables_manager):
|
||||||
if not port_forwardings:
|
if not port_forwardings:
|
||||||
@ -427,7 +432,6 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
|
|||||||
self.check_local_port_forwardings(
|
self.check_local_port_forwardings(
|
||||||
context, ri, ri.fip_managed_by_port_forwardings)
|
context, ri, ri.fip_managed_by_port_forwardings)
|
||||||
|
|
||||||
@lockutils.synchronized('port-forwarding')
|
|
||||||
def add_router(self, context, data):
|
def add_router(self, context, data):
|
||||||
"""Handle a router add event.
|
"""Handle a router add event.
|
||||||
|
|
||||||
@ -438,7 +442,6 @@ class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
|
|||||||
"""
|
"""
|
||||||
self.process_port_forwarding(context, data)
|
self.process_port_forwarding(context, data)
|
||||||
|
|
||||||
@lockutils.synchronized('port-forwarding')
|
|
||||||
def update_router(self, context, data):
|
def update_router(self, context, data):
|
||||||
"""Handle a router update event.
|
"""Handle a router update event.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user