L2 gateway agent scheduler.

The patch set contains the code for the L2 gateway
scheduler that determines which L2 gateway agent
is the Monitor agent out of available agents if
there does not exist Monitor agent in the list
of active agents.
If only one L2 gateway agent is active, then
this piece of code makes that agent the Monitor
agent.
If multiple agents are active, then the logic
makes the L2 gateway agent that has the oldest
'started_at' timestamp as the Monitor agent.
It makes use of fanout cast message to send it to
all the agents. The message payload includes the host
name of the Monitor agent so that that particular
agent changes its role to the Monitor, whereas other
agents who receive this message change their role
to default transact agent.
The patch set also removes the term TRANSACT
as one state "MONITOR" is enough to decide
the role.

Co-Authored-By: Maruti Kamat <maruti.kamat@hp.com>

Change-Id: Ib06dbe320021de3653d26f267994ced52480254a
stable/ocata
Koteswara Rao Kelam 8 years ago
parent e5cb347301
commit 0ef8aa7770
  1. 5
      etc/l2gw_plugin.ini
  2. 2
      networking_l2gw/services/l2gateway/agent/agent_api.py
  3. 17
      networking_l2gw/services/l2gateway/agent/base_agent_manager.py
  4. 2
      networking_l2gw/services/l2gateway/agent/ovsdb/manager.py
  5. 129
      networking_l2gw/services/l2gateway/agent_scheduler.py
  6. 6
      networking_l2gw/services/l2gateway/common/config.py
  7. 1
      networking_l2gw/services/l2gateway/common/constants.py
  8. 4
      networking_l2gw/services/l2gateway/exceptions.py
  9. 96
      networking_l2gw/services/l2gateway/plugin.py
  10. 2
      networking_l2gw/tests/unit/services/l2gateway/agent/test_agent_api.py
  11. 63
      networking_l2gw/tests/unit/services/l2gateway/agent/test_base_agent_manager.py
  12. 206
      networking_l2gw/tests/unit/services/l2gateway/test_agent_scheduler.py

@ -10,3 +10,8 @@
# (IntOpt) quota of the l2 gateway
# quota_l2_gateway =
# Example: quota_l2_gateway = 10
# (IntOpt) The periodic interval at which the plugin
# checks for the monitoring L2 gateway agent
# periodic_monitoring_interval =
# Example: periodic_monitoring_interval = 5

@ -31,6 +31,6 @@ class L2GatewayAgentApi(object):
def update_ovsdb_changes(self, ovsdb_data):
cctxt = self.client.prepare()
return cctxt.call(self.context,
return cctxt.cast(self.context,
'update_ovsdb_changes',
ovsdb_data=ovsdb_data)

@ -28,9 +28,6 @@ from networking_l2gw.services.l2gateway.common import topics
LOG = logging.getLogger(__name__)
VALID_L2GW_AGENT_TYPES = [n_const.MONITOR, n_const.TRANSACT,
'+'.join([n_const.MONITOR, n_const.TRANSACT])]
class BaseAgentManager(periodic_task.PeriodicTasks):
"""Basic agent manager that handles basic RPCs and report states."""
@ -38,7 +35,7 @@ class BaseAgentManager(periodic_task.PeriodicTasks):
def __init__(self, conf=None):
super(BaseAgentManager, self).__init__()
self.conf = conf or cfg.CONF
self.l2gw_agent_type = n_const.TRANSACT
self.l2gw_agent_type = ''
self.use_call = True
self.gateways = {}
self.context = context.get_admin_context_without_session()
@ -84,14 +81,16 @@ class BaseAgentManager(periodic_task.PeriodicTasks):
def agent_updated(self, context, payload):
LOG.info(_LI("agent_updated by server side %s!"), payload)
def set_l2gateway_agent_type(self, context, l2gw_agent_type):
def set_monitor_agent(self, context, hostname):
"""Handle RPC call from plugin to update agent type.
RPC call from the plugin to accept that I am a monitoring
or a transact agent.
or a transact agent. This is a fanout cast message
"""
if l2gw_agent_type not in VALID_L2GW_AGENT_TYPES:
return n_const.L2GW_INVALID_RPC_MSG_FORMAT
self.l2gw_agent_type = l2gw_agent_type
if hostname == self.conf.host:
self.l2gw_agent_type = n_const.MONITOR
else:
self.l2gw_agent_type = ''
self.agent_state.get('configurations')[n_const.L2GW_AGENT_TYPE
] = self.l2gw_agent_type

@ -88,7 +88,7 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
@periodic_task.periodic_task(run_immediately=True)
def _connect_to_ovsdb_server(self, context):
"""Initializes the connection to the OVSDB servers."""
if self.gateways and n_const.MONITOR in self.l2gw_agent_type:
if self.gateways and self.l2gw_agent_type == n_const.MONITOR:
for key in self.gateways.keys():
gateway = self.gateways.get(key)
ovsdb_fd = gateway.ovsdb_fd

@ -0,0 +1,129 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.from oslo.config import cfg
from oslo.config import cfg
from oslo.utils import timeutils
from neutron import context as neutron_context
from neutron.db import agents_db
from neutron.i18n import _LE
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
import random
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway.common import constants as srv_const
LOG = logging.getLogger(__name__)
class L2GatewayAgentScheduler(agents_db.AgentDbMixin):
"""L2gateway agent scheduler class.
This maintains active and inactive agents and
select monitor and transact agents.
"""
context = None
plugin = None
l2gwplugin = None
def __init__(self, notifier=None):
super(L2GatewayAgentScheduler, self).__init__()
self.notifier = notifier
config.register_l2gw_opts_helper()
self.monitor_interval = cfg.CONF.periodic_monitoring_interval
def initialize_thread(self):
"""Initialization of L2gateway agent scheduler thread."""
try:
self.context = neutron_context.get_admin_context()
self.l2gwplugin = manager.NeutronManager.get_service_plugins().get(
srv_const.L2GW)
self.plugin = manager.NeutronManager.get_plugin()
monitor_thread = loopingcall.FixedIntervalLoopingCall(
self.monitor_agent_state)
monitor_thread.start(
interval=self.monitor_interval,
initial_delay=random.randint(self.monitor_interval,
self.monitor_interval * 2))
LOG.debug("Successfully initialized L2gateway agent scheduler"
" thread with loop interval %s", self.monitor_interval)
except Exception:
LOG.error(_LE("Cannot initialize agent scheduler thread"))
def _select_agent_type(self, agents_to_process):
"""Select the Monitor agent."""
# Various cases to be handled:
# 1. Check if there is a single active L2 gateway agent.
# If only one agent is active, then make it the Monitor agent.
# 2. Else, in the list of the active agents, if there does not
# exist Monitor agent, then make the agent that
# started first as the Monitor agent.
# 3. If multiple Monitor agents exist (case where the Monitor agent
# gets disconnected from the Neutron server and another agent
# becomes the Monitor agent and then the original Monitor agent
# connects back within the agent downtime value), then we need to
# send the fanout message so that only one becomes the Monitor
# agent.
# Check if there already exists Monitor agent and it's the only one.
monitor_agents = [x for x in agents_to_process
if x['configurations'].get(srv_const.L2GW_AGENT_TYPE)
== srv_const.MONITOR]
if len(monitor_agents) == 1:
return
# We either have more than one Monitor agent,
# or there does not exist Monitor agent.
# We will decide which agent should be the Monitor agent.
chosen_agent = None
if len(agents_to_process) == 1:
# Only one agent is configured.
# Make it the Monitor agent
chosen_agent = agents_to_process[0]
else:
# Select the agent with the oldest started_at
# timestamp as the Monitor agent.
sorted_active_agents = sorted(agents_to_process,
key=lambda k: k['started_at'])
chosen_agent = sorted_active_agents[0]
self.l2gwplugin.agent_rpc.set_monitor_agent(chosen_agent['host'])
def monitor_agent_state(self):
"""Represents L2gateway agent scheduler thread.
Maintains list of active and inactive agents based on
the heartbeat recorded.
"""
try:
all_agents = self.plugin.get_agents(
self.context,
filters={'agent_type': [srv_const.AGENT_TYPE_L2GATEWAY]})
except Exception:
LOG.exception(_LE("Unable to get the agent list. Continuing..."))
return
# Reset the agents that will be processed for selecting the
# Monitor agent
agents_to_process = []
for agent in all_agents:
agent_time_stamp = agent['heartbeat_timestamp']
active = not timeutils.is_older_than(agent_time_stamp,
cfg.CONF.agent_down_time * 2)
if active:
agents_to_process.append(agent)
if agents_to_process:
self._select_agent_type(agents_to_process)
return

@ -52,7 +52,11 @@ L2GW_OPTS = [
cfg.IntOpt('quota_l2_gateway',
default=5,
help=_('Number of l2 gateways allowed per tenant, '
'-1 for unlimited'))
'-1 for unlimited')),
cfg.IntOpt('periodic_monitoring_interval',
default=5,
help=_('Periodic interval at which the plugin '
'checks for the monitoring L2 gateway agent'))
]

@ -25,7 +25,6 @@ ERROR_DICT = {L2GW_INVALID_RPC_MSG_FORMAT: "Invalid RPC message format",
"request"}
MONITOR = 'monitor'
TRANSACT = 'transact'
OVSDB_SCHEMA_NAME = 'hardware_vtep'
OVSDB_IDENTIFIER = 'ovsdb_identifier'
L2GW_AGENT_TYPE = 'l2gw_agent_type'

@ -60,6 +60,10 @@ class L2GatewayConnectionNotFound(exceptions.NotFound):
message = _("The connection %(id)s was not found on the l2 gateway")
class L2GatewayPluginNotFound(exceptions.NotFound):
message = _("Plugin or agent extension not found")
base.FAULT_MAP.update({L2GatewayInUse: web_exc.HTTPConflict,
L2GatewayPortInUse: web_exc.HTTPConflict,
L2GatewayConnectionExists: web_exc.HTTPConflict,

@ -0,0 +1,96 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.from oslo.config import cfg
from neutron.common import rpc as n_rpc
from neutron import context as ctx
from neutron.db import agents_db
from neutron.openstack.common import log as logging
from networking_l2gw.db.l2gateway import l2gateway_db
from networking_l2gw.services.l2gateway import agent_scheduler
from networking_l2gw.services.l2gateway.common import constants
from networking_l2gw.services.l2gateway.common import topics
from oslo.config import cfg
from oslo import messaging
LOG = logging.getLogger(__name__)
class L2GatewayCallbacks(object):
"""RPC call back functions for L2gateway."""
def __init__(self, plugin):
super(L2GatewayCallbacks, self).__init__()
self.plugin = plugin
class L2gatewayAgentApi(object):
"""L2gateway plugin to agent RPC API."""
API_VERSION = '1.0'
def __init__(self, topic, context, host):
"""Initialize L2gateway plugin."""
self.context = context
target = messaging.Target(topic=topic, version=self.API_VERSION)
self.client = n_rpc.get_client(target)
def set_monitor_agent(self, hostname):
"""RPC to select Monitor/Transact agent."""
cctxt = self.client.prepare(fanout=True)
return cctxt.cast(self.context,
'set_monitor_agent',
hostname=hostname)
class L2GatewayPlugin(l2gateway_db.L2GatewayMixin):
"""Implementation of the Neutron l2 gateway Service Plugin.
This class manages the workflow of L2 gateway request/response.
"""
supported_extension_aliases = ["l2-gateway",
"l2-gateway-connection"]
def __init__(self):
"""Do the initialization for the l2 gateway service plugin here."""
self.endpoints = [L2GatewayCallbacks(self),
agents_db.AgentExtRpcCallback()]
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(topics.L2GATEWAY_PLUGIN,
self.endpoints,
fanout=False)
self.conn.consume_in_threads()
context = ctx.get_admin_context()
self.agent_rpc = L2gatewayAgentApi(topics.L2GATEWAY_AGENT,
context,
cfg.CONF.host)
super(L2GatewayPlugin, self).__init__()
LOG.debug("starting l2gateway agent scheduler")
self.start_l2gateway_agent_scheduler()
def start_l2gateway_agent_scheduler(self):
"""Start l2gateway agent scheduler thread."""
self.agentscheduler = agent_scheduler.L2GatewayAgentScheduler()
self.agentscheduler.initialize_thread()
def get_plugin_type(self):
"""Get type of the plugin."""
return constants.L2GW
def get_plugin_description(self):
"""Get description of the plugin."""
return "Neutron L2 gateway Service Plugin"

@ -37,5 +37,5 @@ class L2GatewayAgentApiTestCase(base.BaseTestCase):
cctxt = mock.Mock()
self.agent_rpc.client.prepare.return_value = cctxt
self.agent_rpc.update_ovsdb_changes(mock.ANY)
cctxt.call.assert_called_with(
cctxt.cast.assert_called_with(
self.ctxt, 'update_ovsdb_changes', ovsdb_data=mock.ANY)

@ -36,9 +36,8 @@ class TestBaseAgentManager(base.BaseTestCase):
agent_config.register_agent_state_opts_helper(self.conf)
cfg.CONF.set_override('report_interval', 1, 'AGENT')
self.context = mock.Mock
mock_conf = mock.Mock()
self.l2gw_agent_manager = l2gw_manager.BaseAgentManager(
mock_conf)
cfg.CONF)
def test_init(self):
with mock.patch.object(agent_api,
@ -47,7 +46,7 @@ class TestBaseAgentManager(base.BaseTestCase):
'_setup_state_rpc') as setup_state_rpc:
self.l2gw_agent_manager.__init__(mock.Mock())
self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type,
n_const.TRANSACT)
'')
self.assertTrue(self.l2gw_agent_manager.admin_state_up)
self.assertTrue(setup_state_rpc.called)
self.assertTrue(l2_gw_agent_api.called)
@ -92,38 +91,26 @@ class TestBaseAgentManager(base.BaseTestCase):
self.l2gw_agent_manager.agent_updated(mock.Mock(), fake_payload)
self.assertEqual(1, logger_call.call_count)
def test_set_l2gateway_agent_type_monitor(self):
l2_gw_agent_type = n_const.MONITOR
self.l2gw_agent_manager.set_l2gateway_agent_type(
self.context, l2_gw_agent_type)
self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type,
l2_gw_agent_type)
self.assertEqual(
self.l2gw_agent_manager.agent_state.get(
'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type)
def test_set_l2gateway_agent_type_transact(self):
l2_gw_agent_type = n_const.TRANSACT
self.l2gw_agent_manager.set_l2gateway_agent_type(
self.context, l2_gw_agent_type)
self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type,
l2_gw_agent_type)
self.assertEqual(
self.l2gw_agent_manager.agent_state.get(
'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type)
def test_set_l2gateway_agent_type_transactmonitor(self):
l2_gw_agent_type = '+'.join([n_const.MONITOR, n_const.TRANSACT])
self.l2gw_agent_manager.set_l2gateway_agent_type(
self.context, l2_gw_agent_type)
self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type,
l2_gw_agent_type)
self.assertEqual(
self.l2gw_agent_manager.agent_state.get(
'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type)
def test_set_l2gateway_agent_type_invalid(self):
l2_gw_agent_type = 'fake_type'
result = self.l2gw_agent_manager.set_l2gateway_agent_type(
self.context, l2_gw_agent_type)
self.assertTrue(result, n_const.L2GW_INVALID_RPC_MSG_FORMAT)
def test_set_monitor_agent_type_monitor(self):
self.l2gw_agent_manager.l2gw_agent_type = ''
self.l2gw_agent_manager.conf.host = 'fake_host'
self.l2gw_agent_manager.set_monitor_agent(self.context, 'fake_host')
self.assertEqual(n_const.MONITOR,
self.l2gw_agent_manager.agent_state.
get('configurations')[n_const.L2GW_AGENT_TYPE])
self.assertEqual(n_const.MONITOR,
self.l2gw_agent_manager.l2gw_agent_type)
def test_set_monitor_agent_type_transact(self):
self.l2gw_agent_manager.l2gw_agent_type = ''
self.conf.host = 'fake_host'
self.l2gw_agent_manager.set_monitor_agent(
self.context, 'fake_host1')
self.assertNotEqual(n_const.MONITOR,
self.l2gw_agent_manager.agent_state.
get('configurations')[n_const.L2GW_AGENT_TYPE])
self.assertEqual('',
self.l2gw_agent_manager.agent_state.
get('configurations')[n_const.L2GW_AGENT_TYPE])
self.assertEqual('',
self.l2gw_agent_manager.l2gw_agent_type)

@ -0,0 +1,206 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import contextlib
import copy
import datetime
import mock
from oslo.config import cfg
from oslo.utils import timeutils
from neutron.common import topics
from neutron import context as neutron_context
from neutron.db import agents_db
from neutron import manager
from neutron.openstack.common import loopingcall
from neutron.plugins.ml2 import rpc
from neutron.tests import base
from networking_l2gw.services.l2gateway import agent_scheduler
from networking_l2gw.services.l2gateway.common import constants as srv_const
def make_active_agent(fake_id, fake_agent_type, config=None):
agent_dict = dict(id=fake_id,
agent_type=fake_agent_type,
host='localhost_' + str(fake_id),
heartbeat_timestamp=timeutils.utcnow(),
started_at=timeutils.utcnow(),
configurations=config)
return agent_dict
def make_inactive_agent(fake_id, fake_agent_type, delta, config=None):
agent_dict = dict(id=fake_id,
agent_type=fake_agent_type,
host='remotehost_' + str(fake_id),
heartbeat_timestamp=(timeutils.utcnow() - datetime.
timedelta(delta)),
configurations=config)
return agent_dict
class FakePlugin(agents_db.AgentDbMixin):
def __init__(self):
self.notifier = rpc.AgentNotifierApi(topics.AGENT)
class TestAgentScheduler(base.BaseTestCase):
fake_a_agent_list = []
fake_i_agent_list = []
def setUp(self):
super(TestAgentScheduler, self).setUp()
cfg.CONF.set_override('core_plugin',
"neutron.plugins.ml2.plugin.Ml2Plugin")
self.plugin = FakePlugin()
self.context = neutron_context.get_admin_context()
cfg.CONF.set_override('agent_down_time', 10)
cfg.CONF.set_override('periodic_monitoring_interval', 5)
self.agentsch = agent_scheduler.L2GatewayAgentScheduler(cfg.CONF)
self.agentsch.plugin = self.plugin
self.agentsch.context = self.context
self.agentsch.agent_ext_support = True
self.LOG = agent_scheduler.LOG
def populate_agent_lists(self, config=None):
self.fake_a_agent_list = []
self.fake_a_agent_list.append(make_active_agent(
'1000', srv_const.AGENT_TYPE_L2GATEWAY, config))
self.fake_i_agent_list = []
self.fake_i_agent_list.append(make_inactive_agent(
'2000', srv_const.AGENT_TYPE_L2GATEWAY, 52, config))
def test_initialize_thread(self):
with contextlib.nested(
mock.patch.object(manager.NeutronManager, 'get_service_plugins'),
mock.patch.object(neutron_context,
'get_admin_context',
return_value=self.context),
mock.patch.object(manager.NeutronManager,
'get_plugin',
return_value=self.plugin),
mock.patch.object(loopingcall, 'FixedIntervalLoopingCall'),
mock.patch.object(self.LOG, 'debug'),
mock.patch.object(self.LOG, 'error')
) as (get_srv_plugin, get_context, get_plugin, loop_call, debug, err):
self.agentsch.initialize_thread()
self.assertTrue(get_srv_plugin.called)
self.assertTrue(get_context.called)
self.assertTrue(get_plugin.called)
self.assertTrue(loop_call.called)
self.assertTrue(debug.called)
self.assertFalse(err.called)
def test_initialize_thread_get_plugin_exception(self):
with contextlib.nested(
mock.patch.object(manager.NeutronManager, 'get_service_plugins'),
mock.patch.object(neutron_context,
'get_admin_context',
return_value=self.context),
mock.patch.object(manager.NeutronManager,
'get_plugin',
side_effect=RuntimeError),
mock.patch.object(loopingcall, 'FixedIntervalLoopingCall'),
mock.patch.object(self.LOG, 'error')
) as (get_srv_plugin, get_context, get_plugin, loop_call, log_err):
self.agentsch.initialize_thread()
self.assertTrue(get_srv_plugin.called)
self.assertTrue(get_context.called)
self.assertTrue(get_plugin.called)
self.assertFalse(loop_call.called)
self.assertTrue(log_err.called)
def test_initialize_thread_loop_call_exception(self):
with contextlib.nested(
mock.patch.object(manager.NeutronManager, 'get_service_plugins'),
mock.patch.object(neutron_context,
'get_admin_context',
return_value=self.context),
mock.patch.object(manager.NeutronManager,
'get_plugin',
return_value=self.plugin),
mock.patch.object(loopingcall, 'FixedIntervalLoopingCall',
side_effect=RuntimeError),
mock.patch.object(self.LOG, 'error')
) as (get_srv_plugin, get_context, get_plugin, loop_call, log_err):
self.agentsch.initialize_thread()
self.assertTrue(get_context.called)
self.assertTrue(get_plugin.called)
self.assertTrue(loop_call.called)
self.assertTrue(log_err.called)
def test_select_agent_type_one_active(self):
config = {'cluster_id': 'foo',
srv_const.L2GW_AGENT_TYPE: ''}
self.populate_agent_lists(config)
with contextlib.nested(
mock.patch('__builtin__.sorted'),
mock.patch.object(manager, 'NeutronManager'),
mock.patch.object(self.LOG, 'exception')
) as (mock_sorted, mgr, logger_call):
self.agentsch.l2gwplugin = mock.Mock()
self.agentsch._select_agent_type(self.fake_a_agent_list)
self.agentsch.l2gwplugin.agent_rpc.set_monitor_agent_called_with(
self.fake_a_agent_list[0]['host'])
def test_select_agent_type_multiple_active(self):
config = {'cluster_id': 'foo',
srv_const.L2GW_AGENT_TYPE: ''}
self.populate_agent_lists(config)
self.fake_a_agent_list.append(make_active_agent(
'1001', srv_const.AGENT_TYPE_L2GATEWAY, config))
self.agentsch.l2gwplugin = mock.Mock()
with contextlib.nested(
mock.patch.object(manager, 'NeutronManager'),
mock.patch.object(self.LOG, 'exception')
) as (mgr, logger_call):
self.agentsch._select_agent_type(self.fake_a_agent_list)
self.agentsch.l2gwplugin.agent_rpc.set_monitor_agent_called_with(
self.fake_a_agent_list[0]['host'])
def test_monitor_agent_state(self):
config = {'cluster_id': 'foo',
srv_const.L2GW_AGENT_TYPE: ''}
self.populate_agent_lists(config)
fake_all_agent_list = copy.deepcopy(self.fake_i_agent_list)
fake_all_agent_list.extend(self.fake_a_agent_list)
self.fake_a_agent_list.append(make_active_agent(
'1001', srv_const.AGENT_TYPE_L2GATEWAY, config))
with contextlib.nested(
mock.patch.object(self.agentsch, '_select_agent_type'),
mock.patch.object(self.plugin, 'get_agents',
return_value=fake_all_agent_list)
) as (select_agent, get_agent_list):
self.agentsch.monitor_agent_state()
self.assertTrue(get_agent_list.called)
self.assertTrue(select_agent.called)
def test_monitor_agent_state_exception_get_agents(self):
with contextlib.nested(
mock.patch.object(self.plugin, 'get_agents',
side_effect=Exception),
mock.patch.object(self.LOG, 'exception')
) as (get_agent_list, exception_log):
self.agentsch.monitor_agent_state()
self.assertTrue(get_agent_list.called)
self.assertTrue(exception_log.called)
Loading…
Cancel
Save