Modify dhcp agent for agent management extension

2nd part of blueprint quantum-scheduler

Remove openstack openstack listener on DHCP agent side.
Add DHCPagent notifier on quantum server side.

Change-Id: I196691650a99ba865bf06081a1fc4546f9fac7bd
This commit is contained in:
gongysh 2013-02-18 16:42:34 +08:00
parent c5d169f36c
commit 8869725c65
12 changed files with 288 additions and 179 deletions

View File

@ -36,6 +36,7 @@ dhcp_driver = quantum.agent.linux.dhcp.Dnsmasq
# be activated when the subnet gateway_ip is None. The guest instance must # be activated when the subnet gateway_ip is None. The guest instance must
# be configured to request host routes via DHCP (Option 121). # be configured to request host routes via DHCP (Option 121).
# enable_isolated_metadata = False # enable_isolated_metadata = False
# Allows for serving metadata requests coming from a dedicated metadata # Allows for serving metadata requests coming from a dedicated metadata
# access network whose cidr is 169.254.169.254/16 (or larger prefix), and # access network whose cidr is 169.254.169.254/16 (or larger prefix), and
# is connected to a Quantum router from which the VMs send metadata # is connected to a Quantum router from which the VMs send metadata
@ -43,3 +44,6 @@ dhcp_driver = quantum.agent.linux.dhcp.Dnsmasq
# they will be able to reach 169.254.169.254 through a router. # they will be able to reach 169.254.169.254 through a router.
# This option requires enable_isolated_metadata = True # This option requires enable_isolated_metadata = True
# enable_metadata_network = False # enable_metadata_network = False
# The Quantum DHCP agent manager.
# dhcp_agent_manager = quantum.agent.dhcp_agent.DhcpAgentWithStateReport

View File

@ -64,6 +64,9 @@ api_paste_config = api-paste.ini
# DHCP Lease duration (in seconds) # DHCP Lease duration (in seconds)
# dhcp_lease_duration = 120 # dhcp_lease_duration = 120
# Allow sending resource operation notification to DHCP agent
# dhcp_agent_notification = True
# Enable or disable bulk create/update/delete operations # Enable or disable bulk create/update/delete operations
# allow_bulk = True # allow_bulk = True
# Enable or disable overlapping IPs for subnets # Enable or disable overlapping IPs for subnets

View File

@ -33,11 +33,15 @@ from quantum.common import constants
from quantum.common import exceptions from quantum.common import exceptions
from quantum.common import topics from quantum.common import topics
from quantum import context from quantum import context
from quantum import manager
from quantum.openstack.common import importutils from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import proxy from quantum.openstack.common.rpc import proxy
from quantum.openstack.common import service
from quantum.openstack.common import uuidutils from quantum.openstack.common import uuidutils
from quantum import service as quantum_service
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
NS_PREFIX = 'qdhcp-' NS_PREFIX = 'qdhcp-'
@ -46,7 +50,7 @@ METADATA_DEFAULT_IP = '169.254.169.254/%d' % METADATA_DEFAULT_PREFIX
METADATA_PORT = 80 METADATA_PORT = 80
class DhcpAgent(object): class DhcpAgent(manager.Manager):
OPTS = [ OPTS = [
cfg.IntOpt('resync_interval', default=5, cfg.IntOpt('resync_interval', default=5,
help=_("Interval to resync.")), help=_("Interval to resync.")),
@ -60,29 +64,34 @@ class DhcpAgent(object):
cfg.BoolOpt('enable_metadata_network', default=False, cfg.BoolOpt('enable_metadata_network', default=False,
help=_("Allows for serving metadata requests from a " help=_("Allows for serving metadata requests from a "
"dedicate network. Requires " "dedicate network. Requires "
"enable isolated_metadata = True ")) "enable isolated_metadata = True ")),
cfg.StrOpt('dhcp_agent_manager',
default='quantum.agent.dhcp_agent.'
'DhcpAgentWithStateReport',
help=_("The Quantum DHCP agent manager.")),
] ]
def __init__(self, conf): def __init__(self, host=None):
super(DhcpAgent, self).__init__(host=host)
self.needs_resync = False self.needs_resync = False
self.conf = conf self.conf = cfg.CONF
self.cache = NetworkCache() self.cache = NetworkCache()
self.root_helper = config.get_root_helper(conf) self.root_helper = config.get_root_helper(self.conf)
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
ctx = context.get_admin_context_without_session() ctx = context.get_admin_context_without_session()
self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx) self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)
self.device_manager = DeviceManager(self.conf, self.plugin_rpc) self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
self.notifications = agent_rpc.NotificationDispatcher()
self.lease_relay = DhcpLeaseRelay(self.update_lease) self.lease_relay = DhcpLeaseRelay(self.update_lease)
def after_start(self):
self.run()
LOG.info(_("DHCP agent started"))
def run(self): def run(self):
"""Activate the DHCP agent.""" """Activate the DHCP agent."""
self.sync_state() self.sync_state()
self.periodic_resync() self.periodic_resync()
self.lease_relay.start() self.lease_relay.start()
self.notifications.run_dispatch(self)
def _ns_name(self, network): def _ns_name(self, network):
if self.conf.use_namespaces: if self.conf.use_namespaces:
@ -199,12 +208,12 @@ class DhcpAgent(object):
else: else:
self.disable_dhcp_helper(network.id) self.disable_dhcp_helper(network.id)
def network_create_end(self, payload): def network_create_end(self, context, payload):
"""Handle the network.create.end notification event.""" """Handle the network.create.end notification event."""
network_id = payload['network']['id'] network_id = payload['network']['id']
self.enable_dhcp_helper(network_id) self.enable_dhcp_helper(network_id)
def network_update_end(self, payload): def network_update_end(self, context, payload):
"""Handle the network.update.end notification event.""" """Handle the network.update.end notification event."""
network_id = payload['network']['id'] network_id = payload['network']['id']
if payload['network']['admin_state_up']: if payload['network']['admin_state_up']:
@ -212,11 +221,11 @@ class DhcpAgent(object):
else: else:
self.disable_dhcp_helper(network_id) self.disable_dhcp_helper(network_id)
def network_delete_end(self, payload): def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event.""" """Handle the network.delete.end notification event."""
self.disable_dhcp_helper(payload['network_id']) self.disable_dhcp_helper(payload['network_id'])
def subnet_update_end(self, payload): def subnet_update_end(self, context, payload):
"""Handle the subnet.update.end notification event.""" """Handle the subnet.update.end notification event."""
network_id = payload['subnet']['network_id'] network_id = payload['subnet']['network_id']
self.refresh_dhcp_helper(network_id) self.refresh_dhcp_helper(network_id)
@ -224,14 +233,14 @@ class DhcpAgent(object):
# Use the update handler for the subnet create event. # Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end subnet_create_end = subnet_update_end
def subnet_delete_end(self, payload): def subnet_delete_end(self, context, payload):
"""Handle the subnet.delete.end notification event.""" """Handle the subnet.delete.end notification event."""
subnet_id = payload['subnet_id'] subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id) network = self.cache.get_network_by_subnet_id(subnet_id)
if network: if network:
self.refresh_dhcp_helper(network.id) self.refresh_dhcp_helper(network.id)
def port_update_end(self, payload): def port_update_end(self, context, payload):
"""Handle the port.update.end notification event.""" """Handle the port.update.end notification event."""
port = DictModel(payload['port']) port = DictModel(payload['port'])
network = self.cache.get_network_by_id(port.network_id) network = self.cache.get_network_by_id(port.network_id)
@ -242,7 +251,7 @@ class DhcpAgent(object):
# Use the update handler for the port create event. # Use the update handler for the port create event.
port_create_end = port_update_end port_create_end = port_update_end
def port_delete_end(self, payload): def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event.""" """Handle the port.delete.end notification event."""
port = self.cache.get_port_by_id(payload['port_id']) port = self.cache.get_port_by_id(payload['port_id'])
if port: if port:
@ -434,6 +443,19 @@ class NetworkCache(object):
if port.id == port_id: if port.id == port_id:
return port return port
def get_state(self):
net_ids = self.get_network_ids()
num_nets = len(net_ids)
num_subnets = 0
num_ports = 0
for net_id in net_ids:
network = self.get_network_by_id(net_id)
num_subnets += len(network.subnets)
num_ports += len(network.ports)
return {'networks': num_nets,
'subnets': num_subnets,
'ports': num_ports}
class DeviceManager(object): class DeviceManager(object):
OPTS = [ OPTS = [
@ -626,9 +648,46 @@ class DhcpLeaseRelay(object):
eventlet.spawn(eventlet.serve, listener, self._handler) eventlet.spawn(eventlet.serve, listener, self._handler)
class DhcpAgentWithStateReport(DhcpAgent):
def __init__(self, host=None):
super(DhcpAgentWithStateReport, self).__init__(host=host)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
self.agent_state = {
'binary': 'quantum-dhcp-agent',
'host': host,
'topic': topics.DHCP_AGENT,
'configurations': {
'dhcp_driver': cfg.CONF.dhcp_driver,
'use_namespaces': cfg.CONF.use_namespaces,
'dhcp_lease_time': cfg.CONF.dhcp_lease_time},
'start_flag': True,
'agent_type': constants.AGENT_TYPE_DHCP}
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.LoopingCall(self._report_state)
heartbeat.start(interval=report_interval)
def _report_state(self):
try:
self.agent_state.get('configurations').update(
self.cache.get_state())
ctx = context.get_admin_context_without_session()
self.state_rpc.report_state(ctx,
self.agent_state)
except Exception:
LOG.exception(_("Failed reporting state!"))
return
if self.agent_state.pop('start_flag', None):
self.run()
def after_start(self):
LOG.info(_("DHCP agent started"))
def main(): def main():
eventlet.monkey_patch() eventlet.monkey_patch()
cfg.CONF.register_opts(DhcpAgent.OPTS) cfg.CONF.register_opts(DhcpAgent.OPTS)
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF) config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(DeviceManager.OPTS) cfg.CONF.register_opts(DeviceManager.OPTS)
cfg.CONF.register_opts(DhcpLeaseRelay.OPTS) cfg.CONF.register_opts(DhcpLeaseRelay.OPTS)
@ -636,6 +695,8 @@ def main():
cfg.CONF.register_opts(interface.OPTS) cfg.CONF.register_opts(interface.OPTS)
cfg.CONF(project='quantum') cfg.CONF(project='quantum')
config.setup_logging(cfg.CONF) config.setup_logging(cfg.CONF)
server = quantum_service.Service.create(
mgr = DhcpAgent(cfg.CONF) binary='quantum-dhcp-agent',
mgr.run() topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval)
service.launch(server).wait()

View File

@ -100,34 +100,3 @@ class PluginApi(proxy.RpcProxy):
return self.call(context, return self.call(context,
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip), self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),
topic=self.topic) topic=self.topic)
class NotificationDispatcher(object):
def __init__(self):
# Set the Queue size to 1 so that messages stay on server rather than
# being buffered in the process.
self.queue = eventlet.queue.Queue(1)
self.connection = rpc.create_connection(new=True)
topic = '%s.%s' % (rpc_notifier.CONF.notification_topics[0],
api.CONF.default_notification_level.lower())
queue_name = 'notification_listener_%s' % uuidutils.generate_uuid()
self.connection.declare_topic_consumer(topic=topic,
queue_name=queue_name,
callback=self._add_to_queue)
self.connection.consume_in_thread()
def _add_to_queue(self, msg):
self.queue.put(msg)
def run_dispatch(self, handler):
while True:
msg = self.queue.get()
name = msg['event_type'].replace('.', '_')
try:
if hasattr(handler, name):
getattr(handler, name)(msg['payload'])
else:
LOG.debug(_('Unknown event_type: %s.'), msg['event_type'])
except Exception, e:
LOG.warn(_('Error processing message. Exception: %s'), e)

View File

@ -0,0 +1,14 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,14 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,70 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from quantum.common import topics
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import proxy
LOG = logging.getLogger(__name__)
class DhcpAgentNotifyAPI(proxy.RpcProxy):
"""API for plugin to notify DHCP agent."""
BASE_RPC_API_VERSION = '1.0'
# It seems dhcp agent does not support bulk operation
VALID_RESOURCES = ['network', 'subnet', 'port']
VALID_METHOD_NAMES = ['network.create.end',
'network.update.end',
'network.delete.end',
'subnet.create.end',
'subnet.update.end',
'subnet.delete.end',
'port.create.end',
'port.update.end',
'port.delete.end']
def __init__(self, topic=topics.DHCP_AGENT):
super(DhcpAgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def _notification(self, context, method, payload):
"""Notify all the agents that are hosting the network"""
# By now, we have no scheduling feature, so we fanout
# to all of the DHCP agents
self._notification_fanout(context, method, payload)
def _notification_fanout(self, context, method, payload):
"""Fanout the payload to all dhcp agents"""
self.fanout_cast(
context, self.make_msg(method,
payload=payload),
topic=topics.DHCP_AGENT)
def notify(self, context, data, methodname):
# data is {'key' : 'value'} with only one key
if methodname not in self.VALID_METHOD_NAMES:
return
obj_type = data.keys()[0]
if obj_type not in self.VALID_RESOURCES:
return
obj_value = data[obj_type]
methodname = methodname.replace(".", "_")
if methodname.endswith("_delete_end"):
if 'id' in obj_value:
self._notification(context, methodname,
{obj_type + '_id': obj_value['id']})
else:
self._notification(context, methodname, data)

View File

@ -18,6 +18,9 @@
import netaddr import netaddr
import webob.exc import webob.exc
from oslo.config import cfg
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from quantum.api.v2 import attributes from quantum.api.v2 import attributes
from quantum.api.v2 import resource as wsgi_resource from quantum.api.v2 import resource as wsgi_resource
from quantum.common import exceptions from quantum.common import exceptions
@ -94,6 +97,7 @@ class Controller(object):
self._policy_attrs = [name for (name, info) in self._attr_info.items() self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')] if info.get('required_by_policy')]
self._publisher_id = notifier_api.publisher_id('network') self._publisher_id = notifier_api.publisher_id('network')
self._dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self._member_actions = member_actions self._member_actions = member_actions
if parent: if parent:
@ -193,6 +197,10 @@ class Controller(object):
policy.enforce(request.context, action, obj, plugin=self._plugin) policy.enforce(request.context, action, obj, plugin=self._plugin)
return obj return obj
def _send_dhcp_notification(self, context, data, methodname):
if cfg.CONF.dhcp_agent_notification:
self._dhcp_agent_notifier.notify(context, data, methodname)
def index(self, request, **kwargs): def index(self, request, **kwargs):
"""Returns a list of the requested entity""" """Returns a list of the requested entity"""
parent_id = kwargs.get(self._parent_id_name) parent_id = kwargs.get(self._parent_id_name)
@ -298,11 +306,15 @@ class Controller(object):
**kwargs) **kwargs)
def notify(create_result): def notify(create_result):
notifier_method = self._resource + '.create.end'
notifier_api.notify(request.context, notifier_api.notify(request.context,
self._publisher_id, self._publisher_id,
self._resource + '.create.end', notifier_method,
notifier_api.CONF.default_notification_level, notifier_api.CONF.default_notification_level,
create_result) create_result)
self._send_dhcp_notification(request.context,
create_result,
notifier_method)
return create_result return create_result
kwargs = {self._parent_id_name: parent_id} if parent_id else {} kwargs = {self._parent_id_name: parent_id} if parent_id else {}
@ -348,11 +360,16 @@ class Controller(object):
obj_deleter = getattr(self._plugin, action) obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs) obj_deleter(request.context, id, **kwargs)
notifier_method = self._resource + '.delete.end'
notifier_api.notify(request.context, notifier_api.notify(request.context,
self._publisher_id, self._publisher_id,
self._resource + '.delete.end', notifier_method,
notifier_api.CONF.default_notification_level, notifier_api.CONF.default_notification_level,
{self._resource + '_id': id}) {self._resource + '_id': id})
result = {self._resource: self._view(obj)}
self._send_dhcp_notification(request.context,
result,
notifier_method)
def update(self, request, id, body=None, **kwargs): def update(self, request, id, body=None, **kwargs):
"""Updates the specified entity's attributes""" """Updates the specified entity's attributes"""
@ -398,11 +415,15 @@ class Controller(object):
kwargs[self._parent_id_name] = parent_id kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs) obj = obj_updater(request.context, id, **kwargs)
result = {self._resource: self._view(obj)} result = {self._resource: self._view(obj)}
notifier_method = self._resource + '.update.end'
notifier_api.notify(request.context, notifier_api.notify(request.context,
self._publisher_id, self._publisher_id,
self._resource + '.update.end', notifier_method,
notifier_api.CONF.default_notification_level, notifier_api.CONF.default_notification_level,
result) result)
self._send_dhcp_notification(request.context,
result,
notifier_method)
return result return result
@staticmethod @staticmethod

View File

@ -62,6 +62,9 @@ core_opts = [
help=_("Maximum number of host routes per subnet")), help=_("Maximum number of host routes per subnet")),
cfg.IntOpt('dhcp_lease_duration', default=120, cfg.IntOpt('dhcp_lease_duration', default=120,
help=_("DHCP lease duration")), help=_("DHCP lease duration")),
cfg.BoolOpt('dhcp_agent_notification', default=True,
help=_("Allow sending resource operation"
" notification to DHCP agent")),
cfg.BoolOpt('allow_overlapping_ips', default=False, cfg.BoolOpt('allow_overlapping_ips', default=False,
help=_("Allow overlapping IP support in Quantum")), help=_("Allow overlapping IP support in Quantum")),
cfg.StrOpt('host', default=utils.get_hostname(), cfg.StrOpt('host', default=utils.get_hostname(),

View File

@ -27,6 +27,7 @@ PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer' DHCP = 'q-dhcp-notifer'
L3_AGENT = 'l3_agent' L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent'
def get_topic_name(prefix, table, operation): def get_topic_name(prefix, table, operation):

View File

@ -62,84 +62,3 @@ class AgentRPCMethods(unittest.TestCase):
with mock.patch(call_to_patch) as create_connection: with mock.patch(call_to_patch) as create_connection:
conn = rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')]) conn = rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected) create_connection.assert_has_calls(expected)
class AgentRPCNotificationDispatcher(unittest.TestCase):
def setUp(self):
self.create_connection_p = mock.patch(
'quantum.openstack.common.rpc.create_connection')
self.create_connection = self.create_connection_p.start()
cfg.CONF.set_override('default_notification_level', 'INFO')
cfg.CONF.set_override('notification_topics', ['notifications'])
def tearDown(self):
self.create_connection_p.stop()
cfg.CONF.reset()
def test_init(self):
nd = rpc.NotificationDispatcher()
expected = [
mock.call(new=True),
mock.call().declare_topic_consumer(topic='notifications.info',
queue_name=mock.ANY,
callback=nd._add_to_queue),
mock.call().consume_in_thread()
]
self.create_connection.assert_has_calls(expected)
def test_add_to_queue(self):
nd = rpc.NotificationDispatcher()
nd._add_to_queue('foo')
self.assertEqual(nd.queue.get(), 'foo')
def _test_run_dispatch_helper(self, msg, handler):
msgs = [msg]
def side_effect(*args):
return msgs.pop(0)
with mock.patch('eventlet.Queue.get') as queue_get:
queue_get.side_effect = side_effect
nd = rpc.NotificationDispatcher()
# catch the assertion so that the loop runs once
self.assertRaises(IndexError, nd.run_dispatch, handler)
def test_run_dispatch_once(self):
class SimpleHandler:
def __init__(self):
self.network_delete_end = mock.Mock()
msg = dict(event_type='network.delete.end',
payload=dict(network_id='a'))
handler = SimpleHandler()
self._test_run_dispatch_helper(msg, handler)
handler.network_delete_end.called_once_with(msg['payload'])
def test_run_dispatch_missing_handler(self):
class SimpleHandler:
self.subnet_create_start = mock.Mock()
msg = dict(event_type='network.delete.end',
payload=dict(network_id='a'))
handler = SimpleHandler()
with mock.patch('quantum.agent.rpc.LOG') as log:
self._test_run_dispatch_helper(msg, handler)
log.assert_has_calls([mock.call.debug(mock.ANY, mock.ANY)])
def test_run_dispatch_handler_raises(self):
class SimpleHandler:
def network_delete_end(self, payload):
raise Exception('foo')
msg = dict(event_type='network.delete.end',
payload=dict(network_id='a'))
handler = SimpleHandler()
with mock.patch('quantum.agent.rpc.LOG') as log:
self._test_run_dispatch_helper(msg, handler)
log.assert_has_calls([mock.call.warn(mock.ANY, mock.ANY)])

View File

@ -19,12 +19,15 @@ import socket
import sys import sys
import uuid import uuid
import eventlet
import mock import mock
from oslo.config import cfg from oslo.config import cfg
import unittest2 as unittest import unittest2 as unittest
from quantum.agent.common import config from quantum.agent.common import config
from quantum.agent import dhcp_agent from quantum.agent import dhcp_agent
from quantum.agent.dhcp_agent import DhcpAgentWithStateReport
from quantum.agent.linux import dhcp
from quantum.agent.linux import interface from quantum.agent.linux import interface
from quantum.common import constants from quantum.common import constants
from quantum.common import exceptions from quantum.common import exceptions
@ -33,6 +36,7 @@ from quantum.openstack.common import jsonutils
ROOTDIR = os.path.dirname(os.path.dirname(__file__)) ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc') ETCDIR = os.path.join(ROOTDIR, 'etc')
HOSTNAME = 'hostname'
def etcdir(*p): def etcdir(*p):
@ -114,34 +118,65 @@ class TestDhcpAgent(unittest.TestCase):
self.driver = mock.Mock(name='driver') self.driver = mock.Mock(name='driver')
self.driver_cls = self.driver_cls_p.start() self.driver_cls = self.driver_cls_p.start()
self.driver_cls.return_value = self.driver self.driver_cls.return_value = self.driver
self.notification_p = mock.patch(
'quantum.agent.rpc.NotificationDispatcher')
self.notification = self.notification_p.start()
def tearDown(self): def tearDown(self):
self.notification_p.stop()
self.driver_cls_p.stop() self.driver_cls_p.stop()
cfg.CONF.reset() cfg.CONF.reset()
def test_dhcp_agent_main(self): def test_dhcp_agent_manager(self):
state_rpc_str = 'quantum.agent.rpc.PluginReportStateAPI'
lease_relay_str = 'quantum.agent.dhcp_agent.DhcpLeaseRelay'
with mock.patch.object(DhcpAgentWithStateReport,
'sync_state',
autospec=True) as mock_sync_state:
with mock.patch.object(DhcpAgentWithStateReport,
'periodic_resync',
autospec=True) as mock_periodic_resync:
with mock.patch(state_rpc_str) as state_rpc:
with mock.patch(lease_relay_str) as mock_lease_relay:
with mock.patch.object(sys, 'argv') as sys_argv:
sys_argv.return_value = [
'dhcp', '--config-file',
etcdir('quantum.conf.test')]
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(
dhcp_agent.DeviceManager.OPTS)
cfg.CONF.register_opts(
dhcp_agent.DhcpLeaseRelay.OPTS)
cfg.CONF.register_opts(dhcp.OPTS)
cfg.CONF.register_opts(interface.OPTS)
cfg.CONF(project='quantum')
agent_mgr = DhcpAgentWithStateReport('testhost')
eventlet.greenthread.sleep(1)
agent_mgr.after_start()
mock_sync_state.assert_called_once_with(agent_mgr)
mock_periodic_resync.assert_called_once_with(
agent_mgr)
state_rpc.assert_has_calls(
[mock.call(mock.ANY),
mock.call().report_state(mock.ANY, mock.ANY)])
mock_lease_relay.assert_has_calls(
[mock.call(mock.ANY),
mock.call().start()])
def test_dhcp_agent_main_agent_manager(self):
logging_str = 'quantum.agent.common.config.setup_logging' logging_str = 'quantum.agent.common.config.setup_logging'
manager_str = 'quantum.agent.dhcp_agent.DeviceManager' launcher_str = 'quantum.openstack.common.service.ServiceLauncher'
agent_str = 'quantum.agent.dhcp_agent.DhcpAgent'
with mock.patch(logging_str): with mock.patch(logging_str):
with mock.patch(manager_str) as dev_mgr: with mock.patch.object(sys, 'argv') as sys_argv:
with mock.patch(agent_str) as dhcp: with mock.patch(launcher_str) as launcher:
with mock.patch.object(sys, 'argv') as sys_argv: sys_argv.return_value = ['dhcp', '--config-file',
sys_argv.return_value = ['dhcp', '--config-file', etcdir('quantum.conf.test')]
etcdir('quantum.conf.test')] dhcp_agent.main()
dhcp_agent.main() launcher.assert_has_calls(
dev_mgr.assert_called_once(mock.ANY, 'sudo') [mock.call(), mock.call().launch_service(mock.ANY),
dhcp.assert_has_calls([ mock.call().wait()])
mock.call(mock.ANY),
mock.call().run()])
def test_run_completes_single_pass(self): def test_run_completes_single_pass(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr: with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict( attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in [(a, mock.DEFAULT) for a in
['sync_state', 'lease_relay', 'periodic_resync']]) ['sync_state', 'lease_relay', 'periodic_resync']])
@ -151,19 +186,18 @@ class TestDhcpAgent(unittest.TestCase):
mocks['periodic_resync'].assert_called_once_with() mocks['periodic_resync'].assert_called_once_with()
mocks['lease_relay'].assert_has_mock_calls( mocks['lease_relay'].assert_has_mock_calls(
[mock.call.start()]) [mock.call.start()])
self.notification.assert_has_calls([mock.call.run_dispatch()])
def test_ns_name(self): def test_ns_name(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr: with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
mock_net = mock.Mock(id='foo') mock_net = mock.Mock(id='foo')
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertEqual(dhcp._ns_name(mock_net), 'qdhcp-foo') self.assertEqual(dhcp._ns_name(mock_net), 'qdhcp-foo')
def test_ns_name_disabled_namespace(self): def test_ns_name_disabled_namespace(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr: with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
cfg.CONF.set_override('use_namespaces', False) cfg.CONF.set_override('use_namespaces', False)
mock_net = mock.Mock(id='foo') mock_net = mock.Mock(id='foo')
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertIsNone(dhcp._ns_name(mock_net)) self.assertIsNone(dhcp._ns_name(mock_net))
def test_call_driver(self): def test_call_driver(self):
@ -185,7 +219,7 @@ class TestDhcpAgent(unittest.TestCase):
self.driver.return_value.foo.side_effect = Exception self.driver.return_value.foo.side_effect = Exception
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr: with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
with mock.patch.object(dhcp_agent.LOG, 'exception') as log: with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertIsNone(dhcp.call_driver('foo', network)) self.assertIsNone(dhcp.call_driver('foo', network))
self.assertTrue(dev_mgr.called) self.assertTrue(dev_mgr.called)
self.driver.assert_called_once_with(cfg.CONF, self.driver.assert_called_once_with(cfg.CONF,
@ -198,7 +232,7 @@ class TestDhcpAgent(unittest.TestCase):
def test_update_lease(self): def test_update_lease(self):
with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug: with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.update_lease('net_id', '192.168.1.1', 120) dhcp.update_lease('net_id', '192.168.1.1', 120)
plug.assert_has_calls( plug.assert_has_calls(
[mock.call().update_lease_expiration( [mock.call().update_lease_expiration(
@ -209,7 +243,7 @@ class TestDhcpAgent(unittest.TestCase):
plug.return_value.update_lease_expiration.side_effect = Exception plug.return_value.update_lease_expiration.side_effect = Exception
with mock.patch.object(dhcp_agent.LOG, 'exception') as log: with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.update_lease('net_id', '192.168.1.1', 120) dhcp.update_lease('net_id', '192.168.1.1', 120)
plug.assert_has_calls( plug.assert_has_calls(
[mock.call().update_lease_expiration( [mock.call().update_lease_expiration(
@ -224,7 +258,7 @@ class TestDhcpAgent(unittest.TestCase):
mock_plugin.get_active_networks.return_value = active_networks mock_plugin.get_active_networks.return_value = active_networks
plug.return_value = mock_plugin plug.return_value = mock_plugin
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict( attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in [(a, mock.DEFAULT) for a in
@ -260,21 +294,21 @@ class TestDhcpAgent(unittest.TestCase):
plug.return_value = mock_plugin plug.return_value = mock_plugin
with mock.patch.object(dhcp_agent.LOG, 'exception') as log: with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.sync_state() dhcp.sync_state()
self.assertTrue(log.called) self.assertTrue(log.called)
self.assertTrue(dhcp.needs_resync) self.assertTrue(dhcp.needs_resync)
def test_periodic_resync(self): def test_periodic_resync(self):
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn: with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn:
dhcp.periodic_resync() dhcp.periodic_resync()
spawn.assert_called_once_with(dhcp._periodic_resync_helper) spawn.assert_called_once_with(dhcp._periodic_resync_helper)
def test_periodoc_resync_helper(self): def test_periodoc_resync_helper(self):
with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep: with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
dhcp = dhcp_agent.DhcpAgent(cfg.CONF) dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.needs_resync = True dhcp.needs_resync = True
with mock.patch.object(dhcp, 'sync_state') as sync_state: with mock.patch.object(dhcp, 'sync_state') as sync_state:
sync_state.side_effect = RuntimeError sync_state.side_effect = RuntimeError
@ -293,9 +327,6 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
'quantum.agent.linux.interface.NullDriver') 'quantum.agent.linux.interface.NullDriver')
config.register_root_helper(cfg.CONF) config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS) cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
self.notification_p = mock.patch(
'quantum.agent.rpc.NotificationDispatcher')
self.notification = self.notification_p.start()
self.plugin_p = mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') self.plugin_p = mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi')
plugin_cls = self.plugin_p.start() plugin_cls = self.plugin_p.start()
@ -307,7 +338,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.cache = mock.Mock() self.cache = mock.Mock()
cache_cls.return_value = self.cache cache_cls.return_value = self.cache
self.dhcp = dhcp_agent.DhcpAgent(cfg.CONF) self.dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.call_driver_p = mock.patch.object(self.dhcp, 'call_driver') self.call_driver_p = mock.patch.object(self.dhcp, 'call_driver')
self.call_driver = self.call_driver_p.start() self.call_driver = self.call_driver_p.start()
@ -321,7 +352,6 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.call_driver_p.stop() self.call_driver_p.stop()
self.cache_p.stop() self.cache_p.stop()
self.plugin_p.stop() self.plugin_p.stop()
self.notification_p.stop()
def test_enable_dhcp_helper(self): def test_enable_dhcp_helper(self):
self.plugin.get_network_info.return_value = fake_network self.plugin.get_network_info.return_value = fake_network
@ -462,26 +492,26 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
payload = dict(network=dict(id=fake_network.id)) payload = dict(network=dict(id=fake_network.id))
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable: with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
self.dhcp.network_create_end(payload) self.dhcp.network_create_end(None, payload)
enable.assertCalledOnceWith(fake_network.id) enable.assertCalledOnceWith(fake_network.id)
def test_network_update_end_admin_state_up(self): def test_network_update_end_admin_state_up(self):
payload = dict(network=dict(id=fake_network.id, admin_state_up=True)) payload = dict(network=dict(id=fake_network.id, admin_state_up=True))
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable: with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
self.dhcp.network_update_end(payload) self.dhcp.network_update_end(None, payload)
enable.assertCalledOnceWith(fake_network.id) enable.assertCalledOnceWith(fake_network.id)
def test_network_update_end_admin_state_down(self): def test_network_update_end_admin_state_down(self):
payload = dict(network=dict(id=fake_network.id, admin_state_up=False)) payload = dict(network=dict(id=fake_network.id, admin_state_up=False))
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable: with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
self.dhcp.network_update_end(payload) self.dhcp.network_update_end(None, payload)
disable.assertCalledOnceWith(fake_network.id) disable.assertCalledOnceWith(fake_network.id)
def test_network_delete_end(self): def test_network_delete_end(self):
payload = dict(network_id=fake_network.id) payload = dict(network_id=fake_network.id)
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable: with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
self.dhcp.network_delete_end(payload) self.dhcp.network_delete_end(None, payload)
disable.assertCalledOnceWith(fake_network.id) disable.assertCalledOnceWith(fake_network.id)
def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self): def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self):
@ -523,13 +553,13 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.cache.get_network_by_id.return_value = fake_network self.cache.get_network_by_id.return_value = fake_network
self.plugin.get_network_info.return_value = fake_network self.plugin.get_network_info.return_value = fake_network
self.dhcp.subnet_update_end(payload) self.dhcp.subnet_update_end(None, payload)
self.cache.assert_has_calls([mock.call.put(fake_network)]) self.cache.assert_has_calls([mock.call.put(fake_network)])
self.call_driver.assert_called_once_with('reload_allocations', self.call_driver.assert_called_once_with('reload_allocations',
fake_network) fake_network)
def test_subnet_update_end(self): def test_subnet_update_end_restart(self):
new_state = FakeModel(fake_network.id, new_state = FakeModel(fake_network.id,
tenant_id=fake_network.tenant_id, tenant_id=fake_network.tenant_id,
admin_state_up=True, admin_state_up=True,
@ -540,7 +570,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.cache.get_network_by_id.return_value = fake_network self.cache.get_network_by_id.return_value = fake_network
self.plugin.get_network_info.return_value = new_state self.plugin.get_network_info.return_value = new_state
self.dhcp.subnet_update_end(payload) self.dhcp.subnet_update_end(None, payload)
self.cache.assert_has_calls([mock.call.put(new_state)]) self.cache.assert_has_calls([mock.call.put(new_state)])
self.call_driver.assert_called_once_with('restart', self.call_driver.assert_called_once_with('restart',
@ -558,7 +588,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.cache.get_network_by_id.return_value = prev_state self.cache.get_network_by_id.return_value = prev_state
self.plugin.get_network_info.return_value = fake_network self.plugin.get_network_info.return_value = fake_network
self.dhcp.subnet_delete_end(payload) self.dhcp.subnet_delete_end(None, payload)
self.cache.assert_has_calls([ self.cache.assert_has_calls([
mock.call.get_network_by_subnet_id( mock.call.get_network_by_subnet_id(
@ -571,7 +601,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
def test_port_update_end(self): def test_port_update_end(self):
payload = dict(port=vars(fake_port2)) payload = dict(port=vars(fake_port2))
self.cache.get_network_by_id.return_value = fake_network self.cache.get_network_by_id.return_value = fake_network
self.dhcp.port_update_end(payload) self.dhcp.port_update_end(None, payload)
self.cache.assert_has_calls( self.cache.assert_has_calls(
[mock.call.get_network_by_id(fake_port2.network_id), [mock.call.get_network_by_id(fake_port2.network_id),
mock.call.put_port(mock.ANY)]) mock.call.put_port(mock.ANY)])
@ -583,7 +613,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.cache.get_network_by_id.return_value = fake_network self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port2 self.cache.get_port_by_id.return_value = fake_port2
self.dhcp.port_delete_end(payload) self.dhcp.port_delete_end(None, payload)
self.cache.assert_has_calls( self.cache.assert_has_calls(
[mock.call.get_port_by_id(fake_port2.id), [mock.call.get_port_by_id(fake_port2.id),
@ -596,7 +626,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
payload = dict(port_id='unknown') payload = dict(port_id='unknown')
self.cache.get_port_by_id.return_value = None self.cache.get_port_by_id.return_value = None
self.dhcp.port_delete_end(payload) self.dhcp.port_delete_end(None, payload)
self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')]) self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')])
self.assertEqual(self.call_driver.call_count, 0) self.assertEqual(self.call_driver.call_count, 0)