L3 Agent restructure - observer hierarchy

This commit creates the basic observer hierarchy for advanced
services, as part of a multi-step refactoring effort of the L3 agent.

The change set has these modifications:

- Device drivers for VPN and FW services are loaded.
- AdvancedService child instances are created for VPN and FW.
- L3EventObservers is created by the L3 agent and the VPN and FW
  service objects are registered for notifications of events.
- VPN device driver event handlers moved to VPN service instance.
- VPN device driver callbacks to VPN agent, moved to VPN service.
  to service from the VPN device driver.
- Test cases updated and moved related to these changes.
- UT updated to test new methods and refactoring changes.

Future commits will massage the event notification points in the
L3 agent, and implement handlers.

Please keep these things in mind, when reviewing:
- The goal is to refactor the code and not 'improve/change'
  functionality (other than broken tests).
- This is one step of a series of steps to move to the 'final'
  restructuring.
- Because we're incrementally changing the code, there may be
  temporary changes to allow existing code to continue to work.

Co-Authored-By: Assaf Muller

Change-Id: I674c72e37b56aa1f729110310e6f697297c47c09
Partially-implements: blueprint restructure-l3-agent
This commit is contained in:
Paul Michali 2014-12-10 18:50:56 -05:00
parent d7c5306360
commit d264666bba
6 changed files with 354 additions and 6 deletions

View File

@ -27,6 +27,7 @@ from oslo.utils import importutils
from oslo.utils import timeutils
from neutron.agent.common import config
from neutron.agent.l3 import event_observers
from neutron.agent.l3 import ha
from neutron.agent.l3 import link_local_allocator as lla
from neutron.agent.l3 import router_info
@ -53,6 +54,7 @@ from neutron.openstack.common import periodic_task
from neutron.openstack.common import processutils
from neutron.openstack.common import service
from neutron import service as neutron_service
from neutron.services import advanced_service as adv_svc
try:
from neutron_fwaas.services.firewall.agents.l3reference \
import firewall_l3_agent
@ -270,6 +272,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
self.fip_priorities = set(range(FIP_PR_START, FIP_PR_END))
self._queue = queue.RouterProcessingQueue()
self.event_observers = event_observers.L3EventObservers()
super(L3NATAgent, self).__init__(conf=self.conf)
self.target_ex_net_id = None
@ -460,6 +463,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
router=router,
use_ipv6=self.use_ipv6,
ns_name=ns_name)
self.event_observers.notify(
adv_svc.AdvancedService.before_router_added, ri)
self.router_info[router_id] = ri
if self.conf.use_namespaces:
self._create_router_namespace(ri)
@ -486,6 +492,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
"Skipping router removal"), router_id)
return
self.event_observers.notify(
adv_svc.AdvancedService.before_router_removed, ri)
if ri.is_ha:
self.process_ha_router_removed(ri)
@ -501,6 +510,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
del self.router_info[router_id]
self._destroy_router_namespace(ri.ns_name)
self.event_observers.notify(
adv_svc.AdvancedService.after_router_removed, ri)
def _get_metadata_proxy_callback(self, router_id):
def callback(pid_file):
@ -1506,10 +1518,28 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
router_id=router['id'])
if router['id'] not in self.router_info:
self._router_added(router['id'], router)
self._process_added_router(router)
else:
self._process_updated_router(router)
def _process_added_router(self, router):
# TODO(pcm): Next refactoring will rework this logic
self._router_added(router['id'], router)
ri = self.router_info[router['id']]
ri.router = router
self.process_router(ri)
self.event_observers.notify(
adv_svc.AdvancedService.after_router_added, ri)
def _process_updated_router(self, router):
# TODO(pcm): Next refactoring will rework this logic
ri = self.router_info[router['id']]
ri.router = router
self.event_observers.notify(
adv_svc.AdvancedService.before_router_updated, ri)
self.process_router(ri)
self.event_observers.notify(
adv_svc.AdvancedService.after_router_updated, ri)
def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_router():

View File

@ -0,0 +1,35 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class L3EventObservers(object):
"""Manages observers for L3 agent events."""
def __init__(self):
self.observers = set()
def add(self, observer):
"""Add a listener for L3 agent notifications."""
self.observers.add(observer)
def notify(self, l3_event_action, *args, **kwargs):
"""Give interested parties a chance to act on event.
NOTE: Preserves existing behavior for error propagation.
"""
method_name = l3_event_action.__name__
for observer in self.observers:
getattr(observer, method_name)(*args, **kwargs)

View File

@ -0,0 +1,94 @@
# Copyright 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class AdvancedService(object):
"""Observer base class for Advanced Services.
Base class for service types. This should not be instantiated normally.
Instead, a child class is defined for each service type and instantiated
by the corresponding service agent. The instances will have a back
reference to the L3 agent, and will register as an observer of events.
A singleton is used to create only one service object per service type.
This base class provides a definition for all of the L3 event handlers
that a service could "observe". A child class for a service type will
implement handlers, for events of interest.
"""
_instance = None
def __init__(self, l3_agent):
"""Base class for an advanced service.
Do not directly instantiate objects of this class. Should only be
called indirectly by a child class's instance() invocation.
"""
self.l3_agent = l3_agent
# NOTE: Copying L3 agent attributes, so that they are accessible
# from device drivers, which are now provided a service instance.
# TODO(pcm): Address this in future refactorings.
self.conf = l3_agent.conf
self.root_helper = l3_agent.root_helper
@classmethod
def instance(cls, l3_agent):
"""Creates instance (singleton) of service.
Do not directly call this for the base class. Instead, it should be
called by a child class, that represents a specific service type.
This ensures that only one instance is created for all agents of a
specific service type.
"""
if not cls._instance:
with lockutils.lock('instance'):
if not cls._instance:
cls._instance = cls(l3_agent)
return cls._instance
# NOTE: Handler definitions for events generated by the L3 agent.
# Subclasses of AdvancedService can override these to perform service
# specific actions. Unique methods are defined for add/update, as
# some services may want to take different actions.
def before_router_added(self, ri):
"""Actions taken before router_info created."""
pass
def after_router_added(self, ri):
"""Actions taken after router_info created."""
pass
def before_router_updated(self, ri):
"""Actions before processing for an updated router."""
pass
def after_router_updated(self, ri):
"""Actions add processing for an updated router."""
pass
def before_router_removed(self, ri):
"""Actions before removing router."""
pass
def after_router_removed(self, ri):
"""Actions after processing and removing router."""
pass

View File

@ -27,6 +27,7 @@ from neutron.common import config as common_config
from neutron.common import constants as l3_constants
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron.services import advanced_service as adv_svc
from neutron.tests.common.agents import l3_agent as l3_test_agent
from neutron.tests.functional.agent.linux import base
from neutron.tests.unit import test_l3_agent
@ -93,11 +94,8 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
return ri
def _create_router(self, agent, router):
agent._router_added(router['id'], router)
ri = agent.router_info[router['id']]
ri.router = router
agent.process_router(ri)
return ri
agent._process_added_router(router)
return agent.router_info[router['id']]
def _delete_router(self, agent, router_id):
agent._router_removed(router_id)
@ -190,6 +188,31 @@ vrrp_instance VR_1 {
class L3AgentTestCase(L3AgentTestFramework):
def test_observer_notifications_legacy_router(self):
self._test_observer_notifications(enable_ha=False)
def test_observer_notifications_ha_router(self):
self._test_observer_notifications(enable_ha=True)
def _test_observer_notifications(self, enable_ha):
"""Test create, update, delete of router and notifications."""
with mock.patch.object(
self.agent.event_observers, 'notify') as notify:
router_info = self.generate_router_info(enable_ha)
router = self.manage_router(self.agent, router_info)
self.agent._process_updated_router(router.router)
self._delete_router(self.agent, router.router_id)
calls = notify.call_args_list
self.assertEqual(
[((adv_svc.AdvancedService.before_router_added, router),),
((adv_svc.AdvancedService.after_router_added, router),),
((adv_svc.AdvancedService.before_router_updated, router),),
((adv_svc.AdvancedService.after_router_updated, router),),
((adv_svc.AdvancedService.before_router_removed, router),),
((adv_svc.AdvancedService.after_router_removed, router),)],
calls)
def test_legacy_router_lifecycle(self):
self._router_lifecycle(enable_ha=False)

View File

@ -0,0 +1,74 @@
# Copyright 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.agent.l3 import event_observers
from neutron.services import advanced_service as adv_svc
from neutron.tests import base
class DummyService1(adv_svc.AdvancedService):
def before_router_added(self, ri):
pass
def after_router_added(self, ri):
pass
class DummyService2(adv_svc.AdvancedService):
def before_router_added(self, ri):
pass
class TestL3EventObservers(base.BaseTestCase):
def setUp(self):
super(TestL3EventObservers, self).setUp()
self.event_observers = event_observers.L3EventObservers()
def test_add_observer(self):
observer = object()
self.assertNotIn(observer, self.event_observers.observers)
self.event_observers.add(observer)
self.assertIn(observer, self.event_observers.observers)
def test_add_duplicate_observer_is_ignored(self):
observer = object()
self.event_observers.add(observer)
try:
self.event_observers.add(observer)
except Exception:
self.fail('Duplicate additions of observers should be ignored')
self.assertEqual(1, len(self.event_observers.observers))
def test_observers_in_service_notified(self):
"""Test that correct handlers for multiple services are called."""
l3_agent = mock.Mock()
router_info = mock.Mock()
observer1 = DummyService1.instance(l3_agent)
observer2 = DummyService2.instance(l3_agent)
observer1_before_add = mock.patch.object(
DummyService1, 'before_router_added').start()
observer2_before_add = mock.patch.object(
DummyService2, 'before_router_added').start()
self.event_observers.add(observer1)
self.event_observers.add(observer2)
self.event_observers.notify(
adv_svc.AdvancedService.before_router_added, router_info)
observer1_before_add.assert_called_with(router_info)
observer2_before_add.assert_called_with(router_info)

View File

@ -0,0 +1,92 @@
# Copyright 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.agent.l3 import event_observers
from neutron.services import advanced_service
from neutron.tests import base
class FakeServiceA(advanced_service.AdvancedService):
pass
class FakeServiceB(advanced_service.AdvancedService):
pass
class TestAdvancedService(base.BaseTestCase):
def setUp(self):
super(TestAdvancedService, self).setUp()
self.agent = mock.Mock()
self.test_observers = event_observers.L3EventObservers()
# Ensure no instances for each test
FakeServiceA._instance = None
FakeServiceB._instance = None
def test_create_service(self):
"""Test agent saved and service added to observer list."""
my_service = FakeServiceA.instance(self.agent)
self.test_observers.add(my_service)
self.assertIn(my_service, self.test_observers.observers)
self.assertEqual(self.agent, my_service.l3_agent)
def test_service_is_singleton(self):
"""Test that two services of same time use same instance."""
a1 = FakeServiceA.instance(self.agent)
a2 = FakeServiceA.instance(self.agent)
self.assertIs(a1, a2)
def test_shared_observers_for_different_services(self):
"""Test different service type instances created.
The services are unique instances, with different agents, but
sharing the same observer list.
"""
a = FakeServiceA.instance(self.agent)
self.test_observers.add(a)
self.assertEqual(self.agent, a.l3_agent)
self.assertIn(a, self.test_observers.observers)
another_agent = mock.Mock()
b = FakeServiceB.instance(another_agent)
self.test_observers.add(b)
self.assertNotEqual(a, b)
self.assertEqual(another_agent, b.l3_agent)
self.assertIn(b, self.test_observers.observers)
self.assertEqual(2, len(self.test_observers.observers))
def test_unique_observers_for_different_services(self):
"""Test different service types with different observer lists.
The services are unique instances, shared the same agent, but
are using different observer lists.
"""
a = FakeServiceA.instance(self.agent)
self.test_observers.add(a)
other_observers = event_observers.L3EventObservers()
b = FakeServiceB.instance(self.agent)
other_observers.add(b)
self.assertNotEqual(a, b)
self.assertEqual(self.agent, a.l3_agent)
self.assertIn(a, self.test_observers.observers)
self.assertEqual(1, len(self.test_observers.observers))
self.assertEqual(self.agent, b.l3_agent)
self.assertIn(b, other_observers.observers)
self.assertEqual(1, len(other_observers.observers))