diff --git a/etc/l2gw_plugin.ini b/etc/l2gw_plugin.ini index 3855c6a..e57c6a0 100644 --- a/etc/l2gw_plugin.ini +++ b/etc/l2gw_plugin.ini @@ -10,3 +10,8 @@ # (IntOpt) quota of the l2 gateway # quota_l2_gateway = # Example: quota_l2_gateway = 10 + +# (IntOpt) The periodic interval at which the plugin +# checks for the monitoring L2 gateway agent +# periodic_monitoring_interval = +# Example: periodic_monitoring_interval = 5 diff --git a/networking_l2gw/services/l2gateway/agent/agent_api.py b/networking_l2gw/services/l2gateway/agent/agent_api.py index eeecb9e..26b475d 100644 --- a/networking_l2gw/services/l2gateway/agent/agent_api.py +++ b/networking_l2gw/services/l2gateway/agent/agent_api.py @@ -31,6 +31,6 @@ class L2GatewayAgentApi(object): def update_ovsdb_changes(self, ovsdb_data): cctxt = self.client.prepare() - return cctxt.call(self.context, + return cctxt.cast(self.context, 'update_ovsdb_changes', ovsdb_data=ovsdb_data) diff --git a/networking_l2gw/services/l2gateway/agent/base_agent_manager.py b/networking_l2gw/services/l2gateway/agent/base_agent_manager.py index c4ccffa..74fe13a 100644 --- a/networking_l2gw/services/l2gateway/agent/base_agent_manager.py +++ b/networking_l2gw/services/l2gateway/agent/base_agent_manager.py @@ -28,9 +28,6 @@ from networking_l2gw.services.l2gateway.common import topics LOG = logging.getLogger(__name__) -VALID_L2GW_AGENT_TYPES = [n_const.MONITOR, n_const.TRANSACT, - '+'.join([n_const.MONITOR, n_const.TRANSACT])] - class BaseAgentManager(periodic_task.PeriodicTasks): """Basic agent manager that handles basic RPCs and report states.""" @@ -38,7 +35,7 @@ class BaseAgentManager(periodic_task.PeriodicTasks): def __init__(self, conf=None): super(BaseAgentManager, self).__init__() self.conf = conf or cfg.CONF - self.l2gw_agent_type = n_const.TRANSACT + self.l2gw_agent_type = '' self.use_call = True self.gateways = {} self.context = context.get_admin_context_without_session() @@ -84,14 +81,16 @@ class BaseAgentManager(periodic_task.PeriodicTasks): def agent_updated(self, context, payload): LOG.info(_LI("agent_updated by server side %s!"), payload) - def set_l2gateway_agent_type(self, context, l2gw_agent_type): + def set_monitor_agent(self, context, hostname): """Handle RPC call from plugin to update agent type. RPC call from the plugin to accept that I am a monitoring - or a transact agent. + or a transact agent. This is a fanout cast message """ - if l2gw_agent_type not in VALID_L2GW_AGENT_TYPES: - return n_const.L2GW_INVALID_RPC_MSG_FORMAT - self.l2gw_agent_type = l2gw_agent_type + if hostname == self.conf.host: + self.l2gw_agent_type = n_const.MONITOR + else: + self.l2gw_agent_type = '' + self.agent_state.get('configurations')[n_const.L2GW_AGENT_TYPE ] = self.l2gw_agent_type diff --git a/networking_l2gw/services/l2gateway/agent/ovsdb/manager.py b/networking_l2gw/services/l2gateway/agent/ovsdb/manager.py index 4f6a488..1d40fde 100644 --- a/networking_l2gw/services/l2gateway/agent/ovsdb/manager.py +++ b/networking_l2gw/services/l2gateway/agent/ovsdb/manager.py @@ -88,7 +88,7 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): @periodic_task.periodic_task(run_immediately=True) def _connect_to_ovsdb_server(self, context): """Initializes the connection to the OVSDB servers.""" - if self.gateways and n_const.MONITOR in self.l2gw_agent_type: + if self.gateways and self.l2gw_agent_type == n_const.MONITOR: for key in self.gateways.keys(): gateway = self.gateways.get(key) ovsdb_fd = gateway.ovsdb_fd diff --git a/networking_l2gw/services/l2gateway/agent_scheduler.py b/networking_l2gw/services/l2gateway/agent_scheduler.py new file mode 100644 index 0000000..2706fa3 --- /dev/null +++ b/networking_l2gw/services/l2gateway/agent_scheduler.py @@ -0,0 +1,129 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.from oslo.config import cfg +from oslo.config import cfg +from oslo.utils import timeutils + +from neutron import context as neutron_context +from neutron.db import agents_db +from neutron.i18n import _LE +from neutron import manager +from neutron.openstack.common import log as logging +from neutron.openstack.common import loopingcall +import random + +from networking_l2gw.services.l2gateway.common import config +from networking_l2gw.services.l2gateway.common import constants as srv_const + +LOG = logging.getLogger(__name__) + + +class L2GatewayAgentScheduler(agents_db.AgentDbMixin): + """L2gateway agent scheduler class. + + This maintains active and inactive agents and + select monitor and transact agents. + """ + context = None + plugin = None + l2gwplugin = None + + def __init__(self, notifier=None): + super(L2GatewayAgentScheduler, self).__init__() + self.notifier = notifier + config.register_l2gw_opts_helper() + self.monitor_interval = cfg.CONF.periodic_monitoring_interval + + def initialize_thread(self): + """Initialization of L2gateway agent scheduler thread.""" + try: + self.context = neutron_context.get_admin_context() + self.l2gwplugin = manager.NeutronManager.get_service_plugins().get( + srv_const.L2GW) + self.plugin = manager.NeutronManager.get_plugin() + monitor_thread = loopingcall.FixedIntervalLoopingCall( + self.monitor_agent_state) + monitor_thread.start( + interval=self.monitor_interval, + initial_delay=random.randint(self.monitor_interval, + self.monitor_interval * 2)) + LOG.debug("Successfully initialized L2gateway agent scheduler" + " thread with loop interval %s", self.monitor_interval) + except Exception: + LOG.error(_LE("Cannot initialize agent scheduler thread")) + + def _select_agent_type(self, agents_to_process): + """Select the Monitor agent.""" + # Various cases to be handled: + # 1. Check if there is a single active L2 gateway agent. + # If only one agent is active, then make it the Monitor agent. + # 2. Else, in the list of the active agents, if there does not + # exist Monitor agent, then make the agent that + # started first as the Monitor agent. + # 3. If multiple Monitor agents exist (case where the Monitor agent + # gets disconnected from the Neutron server and another agent + # becomes the Monitor agent and then the original Monitor agent + # connects back within the agent downtime value), then we need to + # send the fanout message so that only one becomes the Monitor + # agent. + + # Check if there already exists Monitor agent and it's the only one. + monitor_agents = [x for x in agents_to_process + if x['configurations'].get(srv_const.L2GW_AGENT_TYPE) + == srv_const.MONITOR] + if len(monitor_agents) == 1: + return + + # We either have more than one Monitor agent, + # or there does not exist Monitor agent. + # We will decide which agent should be the Monitor agent. + chosen_agent = None + if len(agents_to_process) == 1: + # Only one agent is configured. + # Make it the Monitor agent + chosen_agent = agents_to_process[0] + else: + # Select the agent with the oldest started_at + # timestamp as the Monitor agent. + sorted_active_agents = sorted(agents_to_process, + key=lambda k: k['started_at']) + chosen_agent = sorted_active_agents[0] + self.l2gwplugin.agent_rpc.set_monitor_agent(chosen_agent['host']) + + def monitor_agent_state(self): + """Represents L2gateway agent scheduler thread. + + Maintains list of active and inactive agents based on + the heartbeat recorded. + """ + try: + all_agents = self.plugin.get_agents( + self.context, + filters={'agent_type': [srv_const.AGENT_TYPE_L2GATEWAY]}) + except Exception: + LOG.exception(_LE("Unable to get the agent list. Continuing...")) + return + + # Reset the agents that will be processed for selecting the + # Monitor agent + agents_to_process = [] + for agent in all_agents: + agent_time_stamp = agent['heartbeat_timestamp'] + active = not timeutils.is_older_than(agent_time_stamp, + cfg.CONF.agent_down_time * 2) + if active: + agents_to_process.append(agent) + if agents_to_process: + self._select_agent_type(agents_to_process) + return diff --git a/networking_l2gw/services/l2gateway/common/config.py b/networking_l2gw/services/l2gateway/common/config.py index 6321d8c..7a8fd6d 100644 --- a/networking_l2gw/services/l2gateway/common/config.py +++ b/networking_l2gw/services/l2gateway/common/config.py @@ -52,7 +52,11 @@ L2GW_OPTS = [ cfg.IntOpt('quota_l2_gateway', default=5, help=_('Number of l2 gateways allowed per tenant, ' - '-1 for unlimited')) + '-1 for unlimited')), + cfg.IntOpt('periodic_monitoring_interval', + default=5, + help=_('Periodic interval at which the plugin ' + 'checks for the monitoring L2 gateway agent')) ] diff --git a/networking_l2gw/services/l2gateway/common/constants.py b/networking_l2gw/services/l2gateway/common/constants.py index 33052ca..4e6cb20 100644 --- a/networking_l2gw/services/l2gateway/common/constants.py +++ b/networking_l2gw/services/l2gateway/common/constants.py @@ -25,7 +25,6 @@ ERROR_DICT = {L2GW_INVALID_RPC_MSG_FORMAT: "Invalid RPC message format", "request"} MONITOR = 'monitor' -TRANSACT = 'transact' OVSDB_SCHEMA_NAME = 'hardware_vtep' OVSDB_IDENTIFIER = 'ovsdb_identifier' L2GW_AGENT_TYPE = 'l2gw_agent_type' diff --git a/networking_l2gw/services/l2gateway/exceptions.py b/networking_l2gw/services/l2gateway/exceptions.py index 490ef2d..5312830 100644 --- a/networking_l2gw/services/l2gateway/exceptions.py +++ b/networking_l2gw/services/l2gateway/exceptions.py @@ -60,6 +60,10 @@ class L2GatewayConnectionNotFound(exceptions.NotFound): message = _("The connection %(id)s was not found on the l2 gateway") +class L2GatewayPluginNotFound(exceptions.NotFound): + message = _("Plugin or agent extension not found") + + base.FAULT_MAP.update({L2GatewayInUse: web_exc.HTTPConflict, L2GatewayPortInUse: web_exc.HTTPConflict, L2GatewayConnectionExists: web_exc.HTTPConflict, diff --git a/networking_l2gw/services/l2gateway/plugin.py b/networking_l2gw/services/l2gateway/plugin.py new file mode 100644 index 0000000..68f548c --- /dev/null +++ b/networking_l2gw/services/l2gateway/plugin.py @@ -0,0 +1,96 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.from oslo.config import cfg +from neutron.common import rpc as n_rpc +from neutron import context as ctx +from neutron.db import agents_db +from neutron.openstack.common import log as logging + +from networking_l2gw.db.l2gateway import l2gateway_db +from networking_l2gw.services.l2gateway import agent_scheduler +from networking_l2gw.services.l2gateway.common import constants +from networking_l2gw.services.l2gateway.common import topics + +from oslo.config import cfg +from oslo import messaging + +LOG = logging.getLogger(__name__) + + +class L2GatewayCallbacks(object): + """RPC call back functions for L2gateway.""" + + def __init__(self, plugin): + super(L2GatewayCallbacks, self).__init__() + self.plugin = plugin + + +class L2gatewayAgentApi(object): + """L2gateway plugin to agent RPC API.""" + + API_VERSION = '1.0' + + def __init__(self, topic, context, host): + """Initialize L2gateway plugin.""" + self.context = context + target = messaging.Target(topic=topic, version=self.API_VERSION) + self.client = n_rpc.get_client(target) + + def set_monitor_agent(self, hostname): + """RPC to select Monitor/Transact agent.""" + cctxt = self.client.prepare(fanout=True) + return cctxt.cast(self.context, + 'set_monitor_agent', + hostname=hostname) + + +class L2GatewayPlugin(l2gateway_db.L2GatewayMixin): + + """Implementation of the Neutron l2 gateway Service Plugin. + + This class manages the workflow of L2 gateway request/response. + """ + + supported_extension_aliases = ["l2-gateway", + "l2-gateway-connection"] + + def __init__(self): + """Do the initialization for the l2 gateway service plugin here.""" + self.endpoints = [L2GatewayCallbacks(self), + agents_db.AgentExtRpcCallback()] + self.conn = n_rpc.create_connection(new=True) + self.conn.create_consumer(topics.L2GATEWAY_PLUGIN, + self.endpoints, + fanout=False) + self.conn.consume_in_threads() + context = ctx.get_admin_context() + self.agent_rpc = L2gatewayAgentApi(topics.L2GATEWAY_AGENT, + context, + cfg.CONF.host) + super(L2GatewayPlugin, self).__init__() + LOG.debug("starting l2gateway agent scheduler") + self.start_l2gateway_agent_scheduler() + + def start_l2gateway_agent_scheduler(self): + """Start l2gateway agent scheduler thread.""" + self.agentscheduler = agent_scheduler.L2GatewayAgentScheduler() + self.agentscheduler.initialize_thread() + + def get_plugin_type(self): + """Get type of the plugin.""" + return constants.L2GW + + def get_plugin_description(self): + """Get description of the plugin.""" + return "Neutron L2 gateway Service Plugin" diff --git a/networking_l2gw/tests/unit/services/l2gateway/agent/test_agent_api.py b/networking_l2gw/tests/unit/services/l2gateway/agent/test_agent_api.py index 16c815d..60b1626 100644 --- a/networking_l2gw/tests/unit/services/l2gateway/agent/test_agent_api.py +++ b/networking_l2gw/tests/unit/services/l2gateway/agent/test_agent_api.py @@ -37,5 +37,5 @@ class L2GatewayAgentApiTestCase(base.BaseTestCase): cctxt = mock.Mock() self.agent_rpc.client.prepare.return_value = cctxt self.agent_rpc.update_ovsdb_changes(mock.ANY) - cctxt.call.assert_called_with( + cctxt.cast.assert_called_with( self.ctxt, 'update_ovsdb_changes', ovsdb_data=mock.ANY) diff --git a/networking_l2gw/tests/unit/services/l2gateway/agent/test_base_agent_manager.py b/networking_l2gw/tests/unit/services/l2gateway/agent/test_base_agent_manager.py index a399ba9..d0d1a50 100644 --- a/networking_l2gw/tests/unit/services/l2gateway/agent/test_base_agent_manager.py +++ b/networking_l2gw/tests/unit/services/l2gateway/agent/test_base_agent_manager.py @@ -36,9 +36,8 @@ class TestBaseAgentManager(base.BaseTestCase): agent_config.register_agent_state_opts_helper(self.conf) cfg.CONF.set_override('report_interval', 1, 'AGENT') self.context = mock.Mock - mock_conf = mock.Mock() self.l2gw_agent_manager = l2gw_manager.BaseAgentManager( - mock_conf) + cfg.CONF) def test_init(self): with mock.patch.object(agent_api, @@ -47,7 +46,7 @@ class TestBaseAgentManager(base.BaseTestCase): '_setup_state_rpc') as setup_state_rpc: self.l2gw_agent_manager.__init__(mock.Mock()) self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type, - n_const.TRANSACT) + '') self.assertTrue(self.l2gw_agent_manager.admin_state_up) self.assertTrue(setup_state_rpc.called) self.assertTrue(l2_gw_agent_api.called) @@ -92,38 +91,26 @@ class TestBaseAgentManager(base.BaseTestCase): self.l2gw_agent_manager.agent_updated(mock.Mock(), fake_payload) self.assertEqual(1, logger_call.call_count) - def test_set_l2gateway_agent_type_monitor(self): - l2_gw_agent_type = n_const.MONITOR - self.l2gw_agent_manager.set_l2gateway_agent_type( - self.context, l2_gw_agent_type) - self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type, - l2_gw_agent_type) - self.assertEqual( - self.l2gw_agent_manager.agent_state.get( - 'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type) - - def test_set_l2gateway_agent_type_transact(self): - l2_gw_agent_type = n_const.TRANSACT - self.l2gw_agent_manager.set_l2gateway_agent_type( - self.context, l2_gw_agent_type) - self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type, - l2_gw_agent_type) - self.assertEqual( - self.l2gw_agent_manager.agent_state.get( - 'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type) - - def test_set_l2gateway_agent_type_transactmonitor(self): - l2_gw_agent_type = '+'.join([n_const.MONITOR, n_const.TRANSACT]) - self.l2gw_agent_manager.set_l2gateway_agent_type( - self.context, l2_gw_agent_type) - self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type, - l2_gw_agent_type) - self.assertEqual( - self.l2gw_agent_manager.agent_state.get( - 'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type) - - def test_set_l2gateway_agent_type_invalid(self): - l2_gw_agent_type = 'fake_type' - result = self.l2gw_agent_manager.set_l2gateway_agent_type( - self.context, l2_gw_agent_type) - self.assertTrue(result, n_const.L2GW_INVALID_RPC_MSG_FORMAT) + def test_set_monitor_agent_type_monitor(self): + self.l2gw_agent_manager.l2gw_agent_type = '' + self.l2gw_agent_manager.conf.host = 'fake_host' + self.l2gw_agent_manager.set_monitor_agent(self.context, 'fake_host') + self.assertEqual(n_const.MONITOR, + self.l2gw_agent_manager.agent_state. + get('configurations')[n_const.L2GW_AGENT_TYPE]) + self.assertEqual(n_const.MONITOR, + self.l2gw_agent_manager.l2gw_agent_type) + + def test_set_monitor_agent_type_transact(self): + self.l2gw_agent_manager.l2gw_agent_type = '' + self.conf.host = 'fake_host' + self.l2gw_agent_manager.set_monitor_agent( + self.context, 'fake_host1') + self.assertNotEqual(n_const.MONITOR, + self.l2gw_agent_manager.agent_state. + get('configurations')[n_const.L2GW_AGENT_TYPE]) + self.assertEqual('', + self.l2gw_agent_manager.agent_state. + get('configurations')[n_const.L2GW_AGENT_TYPE]) + self.assertEqual('', + self.l2gw_agent_manager.l2gw_agent_type) diff --git a/networking_l2gw/tests/unit/services/l2gateway/test_agent_scheduler.py b/networking_l2gw/tests/unit/services/l2gateway/test_agent_scheduler.py new file mode 100644 index 0000000..cf11613 --- /dev/null +++ b/networking_l2gw/tests/unit/services/l2gateway/test_agent_scheduler.py @@ -0,0 +1,206 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import contextlib +import copy +import datetime + +import mock +from oslo.config import cfg +from oslo.utils import timeutils + +from neutron.common import topics +from neutron import context as neutron_context +from neutron.db import agents_db +from neutron import manager +from neutron.openstack.common import loopingcall +from neutron.plugins.ml2 import rpc +from neutron.tests import base + +from networking_l2gw.services.l2gateway import agent_scheduler +from networking_l2gw.services.l2gateway.common import constants as srv_const + + +def make_active_agent(fake_id, fake_agent_type, config=None): + agent_dict = dict(id=fake_id, + agent_type=fake_agent_type, + host='localhost_' + str(fake_id), + heartbeat_timestamp=timeutils.utcnow(), + started_at=timeutils.utcnow(), + configurations=config) + return agent_dict + + +def make_inactive_agent(fake_id, fake_agent_type, delta, config=None): + agent_dict = dict(id=fake_id, + agent_type=fake_agent_type, + host='remotehost_' + str(fake_id), + heartbeat_timestamp=(timeutils.utcnow() - datetime. + timedelta(delta)), + configurations=config) + return agent_dict + + +class FakePlugin(agents_db.AgentDbMixin): + + def __init__(self): + self.notifier = rpc.AgentNotifierApi(topics.AGENT) + + +class TestAgentScheduler(base.BaseTestCase): + + fake_a_agent_list = [] + fake_i_agent_list = [] + + def setUp(self): + super(TestAgentScheduler, self).setUp() + cfg.CONF.set_override('core_plugin', + "neutron.plugins.ml2.plugin.Ml2Plugin") + self.plugin = FakePlugin() + self.context = neutron_context.get_admin_context() + cfg.CONF.set_override('agent_down_time', 10) + cfg.CONF.set_override('periodic_monitoring_interval', 5) + self.agentsch = agent_scheduler.L2GatewayAgentScheduler(cfg.CONF) + self.agentsch.plugin = self.plugin + self.agentsch.context = self.context + self.agentsch.agent_ext_support = True + self.LOG = agent_scheduler.LOG + + def populate_agent_lists(self, config=None): + self.fake_a_agent_list = [] + self.fake_a_agent_list.append(make_active_agent( + '1000', srv_const.AGENT_TYPE_L2GATEWAY, config)) + + self.fake_i_agent_list = [] + self.fake_i_agent_list.append(make_inactive_agent( + '2000', srv_const.AGENT_TYPE_L2GATEWAY, 52, config)) + + def test_initialize_thread(self): + with contextlib.nested( + mock.patch.object(manager.NeutronManager, 'get_service_plugins'), + mock.patch.object(neutron_context, + 'get_admin_context', + return_value=self.context), + mock.patch.object(manager.NeutronManager, + 'get_plugin', + return_value=self.plugin), + mock.patch.object(loopingcall, 'FixedIntervalLoopingCall'), + mock.patch.object(self.LOG, 'debug'), + mock.patch.object(self.LOG, 'error') + ) as (get_srv_plugin, get_context, get_plugin, loop_call, debug, err): + self.agentsch.initialize_thread() + self.assertTrue(get_srv_plugin.called) + self.assertTrue(get_context.called) + self.assertTrue(get_plugin.called) + self.assertTrue(loop_call.called) + self.assertTrue(debug.called) + self.assertFalse(err.called) + + def test_initialize_thread_get_plugin_exception(self): + with contextlib.nested( + mock.patch.object(manager.NeutronManager, 'get_service_plugins'), + mock.patch.object(neutron_context, + 'get_admin_context', + return_value=self.context), + mock.patch.object(manager.NeutronManager, + 'get_plugin', + side_effect=RuntimeError), + mock.patch.object(loopingcall, 'FixedIntervalLoopingCall'), + mock.patch.object(self.LOG, 'error') + ) as (get_srv_plugin, get_context, get_plugin, loop_call, log_err): + self.agentsch.initialize_thread() + self.assertTrue(get_srv_plugin.called) + self.assertTrue(get_context.called) + self.assertTrue(get_plugin.called) + self.assertFalse(loop_call.called) + self.assertTrue(log_err.called) + + def test_initialize_thread_loop_call_exception(self): + with contextlib.nested( + mock.patch.object(manager.NeutronManager, 'get_service_plugins'), + mock.patch.object(neutron_context, + 'get_admin_context', + return_value=self.context), + mock.patch.object(manager.NeutronManager, + 'get_plugin', + return_value=self.plugin), + mock.patch.object(loopingcall, 'FixedIntervalLoopingCall', + side_effect=RuntimeError), + mock.patch.object(self.LOG, 'error') + ) as (get_srv_plugin, get_context, get_plugin, loop_call, log_err): + self.agentsch.initialize_thread() + self.assertTrue(get_context.called) + self.assertTrue(get_plugin.called) + self.assertTrue(loop_call.called) + self.assertTrue(log_err.called) + + def test_select_agent_type_one_active(self): + config = {'cluster_id': 'foo', + srv_const.L2GW_AGENT_TYPE: ''} + self.populate_agent_lists(config) + + with contextlib.nested( + mock.patch('__builtin__.sorted'), + mock.patch.object(manager, 'NeutronManager'), + mock.patch.object(self.LOG, 'exception') + ) as (mock_sorted, mgr, logger_call): + self.agentsch.l2gwplugin = mock.Mock() + self.agentsch._select_agent_type(self.fake_a_agent_list) + self.agentsch.l2gwplugin.agent_rpc.set_monitor_agent_called_with( + self.fake_a_agent_list[0]['host']) + + def test_select_agent_type_multiple_active(self): + config = {'cluster_id': 'foo', + srv_const.L2GW_AGENT_TYPE: ''} + self.populate_agent_lists(config) + self.fake_a_agent_list.append(make_active_agent( + '1001', srv_const.AGENT_TYPE_L2GATEWAY, config)) + self.agentsch.l2gwplugin = mock.Mock() + + with contextlib.nested( + mock.patch.object(manager, 'NeutronManager'), + mock.patch.object(self.LOG, 'exception') + ) as (mgr, logger_call): + self.agentsch._select_agent_type(self.fake_a_agent_list) + self.agentsch.l2gwplugin.agent_rpc.set_monitor_agent_called_with( + self.fake_a_agent_list[0]['host']) + + def test_monitor_agent_state(self): + config = {'cluster_id': 'foo', + srv_const.L2GW_AGENT_TYPE: ''} + self.populate_agent_lists(config) + fake_all_agent_list = copy.deepcopy(self.fake_i_agent_list) + fake_all_agent_list.extend(self.fake_a_agent_list) + self.fake_a_agent_list.append(make_active_agent( + '1001', srv_const.AGENT_TYPE_L2GATEWAY, config)) + + with contextlib.nested( + mock.patch.object(self.agentsch, '_select_agent_type'), + mock.patch.object(self.plugin, 'get_agents', + return_value=fake_all_agent_list) + ) as (select_agent, get_agent_list): + self.agentsch.monitor_agent_state() + self.assertTrue(get_agent_list.called) + self.assertTrue(select_agent.called) + + def test_monitor_agent_state_exception_get_agents(self): + with contextlib.nested( + mock.patch.object(self.plugin, 'get_agents', + side_effect=Exception), + mock.patch.object(self.LOG, 'exception') + ) as (get_agent_list, exception_log): + self.agentsch.monitor_agent_state() + self.assertTrue(get_agent_list.called) + self.assertTrue(exception_log.called)