Change-Id: Ib13c46e1b5b1415f52ef4ae4fc784323ae2b6f44changes/17/104817/1
parent
2a4b74fe5a
commit
3d51034929
@ -1,177 +0,0 @@
|
||||
# Copyright (c) 2013 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DhcpAgentNotifyAPI(rpc_compat.RpcProxy):
|
||||
"""API for plugin to notify DHCP agent."""
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
# It seems dhcp agent does not support bulk operation
|
||||
VALID_RESOURCES = ['network', 'subnet', 'port']
|
||||
VALID_METHOD_NAMES = ['network.create.end',
|
||||
'network.update.end',
|
||||
'network.delete.end',
|
||||
'subnet.create.end',
|
||||
'subnet.update.end',
|
||||
'subnet.delete.end',
|
||||
'port.create.end',
|
||||
'port.update.end',
|
||||
'port.delete.end']
|
||||
|
||||
def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
|
||||
super(DhcpAgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
self._plugin = plugin
|
||||
|
||||
@property
|
||||
def plugin(self):
|
||||
if self._plugin is None:
|
||||
self._plugin = manager.NeutronManager.get_plugin()
|
||||
return self._plugin
|
||||
|
||||
def _schedule_network(self, context, network, existing_agents):
|
||||
"""Schedule the network to new agents
|
||||
|
||||
:return: all agents associated with the network
|
||||
"""
|
||||
new_agents = self.plugin.schedule_network(context, network) or []
|
||||
if new_agents:
|
||||
for agent in new_agents:
|
||||
self._cast_message(
|
||||
context, 'network_create_end',
|
||||
{'network': {'id': network['id']}}, agent['host'])
|
||||
elif not existing_agents:
|
||||
LOG.warn(_('Unable to schedule network %s: no agents available; '
|
||||
'will retry on subsequent port creation events.'),
|
||||
network['id'])
|
||||
return new_agents + existing_agents
|
||||
|
||||
def _get_enabled_agents(self, context, network, agents, method, payload):
|
||||
"""Get the list of agents whose admin_state is UP."""
|
||||
network_id = network['id']
|
||||
enabled_agents = [x for x in agents if x.admin_state_up]
|
||||
active_agents = [x for x in agents if x.is_active]
|
||||
len_enabled_agents = len(enabled_agents)
|
||||
len_active_agents = len(active_agents)
|
||||
if len_active_agents < len_enabled_agents:
|
||||
LOG.warn(_("Only %(active)d of %(total)d DHCP agents associated "
|
||||
"with network '%(net_id)s' are marked as active, so "
|
||||
" notifications may be sent to inactive agents.")
|
||||
% {'active': len_active_agents,
|
||||
'total': len_enabled_agents,
|
||||
'net_id': network_id})
|
||||
if not enabled_agents:
|
||||
num_ports = self.plugin.get_ports_count(
|
||||
context, {'network_id': [network_id]})
|
||||
notification_required = (
|
||||
num_ports > 0 and len(network['subnets']) >= 1)
|
||||
if notification_required:
|
||||
LOG.error(_("Will not send event %(method)s for network "
|
||||
"%(net_id)s: no agent available. Payload: "
|
||||
"%(payload)s")
|
||||
% {'method': method,
|
||||
'net_id': network_id,
|
||||
'payload': payload})
|
||||
return enabled_agents
|
||||
|
||||
def _notify_agents(self, context, method, payload, network_id):
|
||||
"""Notify all the agents that are hosting the network."""
|
||||
# fanout is required as we do not know who is "listening"
|
||||
no_agents = not utils.is_extension_supported(
|
||||
self.plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS)
|
||||
fanout_required = method == 'network_delete_end' or no_agents
|
||||
|
||||
# we do nothing on network creation because we want to give the
|
||||
# admin the chance to associate an agent to the network manually
|
||||
cast_required = method != 'network_create_end'
|
||||
|
||||
if fanout_required:
|
||||
self._fanout_message(context, method, payload)
|
||||
elif cast_required:
|
||||
admin_ctx = (context if context.is_admin else context.elevated())
|
||||
network = self.plugin.get_network(admin_ctx, network_id)
|
||||
agents = self.plugin.get_dhcp_agents_hosting_networks(
|
||||
context, [network_id])
|
||||
|
||||
# schedule the network first, if needed
|
||||
schedule_required = method == 'port_create_end'
|
||||
if schedule_required:
|
||||
agents = self._schedule_network(admin_ctx, network, agents)
|
||||
|
||||
enabled_agents = self._get_enabled_agents(
|
||||
context, network, agents, method, payload)
|
||||
for agent in enabled_agents:
|
||||
self._cast_message(
|
||||
context, method, payload, agent.host, agent.topic)
|
||||
|
||||
def _cast_message(self, context, method, payload, host,
|
||||
topic=topics.DHCP_AGENT):
|
||||
"""Cast the payload to the dhcp agent running on the host."""
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topic, host))
|
||||
|
||||
def _fanout_message(self, context, method, payload):
|
||||
"""Fanout the payload to all dhcp agents."""
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic=topics.DHCP_AGENT)
|
||||
|
||||
def network_removed_from_agent(self, context, network_id, host):
|
||||
self._cast_message(context, 'network_delete_end',
|
||||
{'network_id': network_id}, host)
|
||||
|
||||
def network_added_to_agent(self, context, network_id, host):
|
||||
self._cast_message(context, 'network_create_end',
|
||||
{'network': {'id': network_id}}, host)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
self._cast_message(context, 'agent_updated',
|
||||
{'admin_state_up': admin_state_up}, host)
|
||||
|
||||
def notify(self, context, data, method_name):
|
||||
# data is {'key' : 'value'} with only one key
|
||||
if method_name not in self.VALID_METHOD_NAMES:
|
||||
return
|
||||
obj_type = data.keys()[0]
|
||||
if obj_type not in self.VALID_RESOURCES:
|
||||
return
|
||||
obj_value = data[obj_type]
|
||||
network_id = None
|
||||
if obj_type == 'network' and 'id' in obj_value:
|
||||
network_id = obj_value['id']
|
||||
elif obj_type in ['port', 'subnet'] and 'network_id' in obj_value:
|
||||
network_id = obj_value['network_id']
|
||||
if not network_id:
|
||||
return
|
||||
method_name = method_name.replace(".", "_")
|
||||
if method_name.endswith("_delete_end"):
|
||||
if 'id' in obj_value:
|
||||
self._notify_agents(context, method_name,
|
||||
{obj_type + '_id': obj_value['id']},
|
||||
network_id)
|
||||
else:
|
||||
self._notify_agents(context, method_name, data, network_id)
|
@ -1,121 +0,0 @@
|
||||
# Copyright (c) 2013 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.plugins.common import constants as service_constants
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class L3AgentNotifyAPI(rpc_compat.RpcProxy):
|
||||
"""API for plugin to notify L3 agent."""
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic=topics.L3_AGENT):
|
||||
super(L3AgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _notification_host(self, context, method, payload, host):
|
||||
"""Notify the agent that is hosting the router."""
|
||||
LOG.debug(_('Nofity agent at %(host)s the message '
|
||||
'%(method)s'), {'host': host,
|
||||
'method': method})
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topics.L3_AGENT, host))
|
||||
|
||||
def _agent_notification(self, context, method, router_ids,
|
||||
operation, data):
|
||||
"""Notify changed routers to hosting l3 agents."""
|
||||
adminContext = context.is_admin and context or context.elevated()
|
||||
plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
for router_id in router_ids:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
adminContext, [router_id],
|
||||
admin_state_up=True,
|
||||
active=True)
|
||||
for l3_agent in l3_agents:
|
||||
LOG.debug(_('Notify agent at %(topic)s.%(host)s the message '
|
||||
'%(method)s'),
|
||||
{'topic': l3_agent.topic,
|
||||
'host': l3_agent.host,
|
||||
'method': method})
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
routers=[router_id]),
|
||||
topic='%s.%s' % (l3_agent.topic, l3_agent.host),
|
||||
version='1.1')
|
||||
|
||||
def _notification(self, context, method, router_ids, operation, data):
|
||||
"""Notify all the agents that are hosting the routers."""
|
||||
plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
if not plugin:
|
||||
LOG.error(_('No plugin for L3 routing registered. Cannot notify '
|
||||
'agents with the message %s'), method)
|
||||
return
|
||||
if utils.is_extension_supported(
|
||||
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
|
||||
adminContext = (context.is_admin and
|
||||
context or context.elevated())
|
||||
plugin.schedule_routers(adminContext, router_ids)
|
||||
self._agent_notification(
|
||||
context, method, router_ids, operation, data)
|
||||
else:
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
routers=router_ids),
|
||||
topic=topics.L3_AGENT)
|
||||
|
||||
def _notification_fanout(self, context, method, router_id):
|
||||
"""Fanout the deleted router to all L3 agents."""
|
||||
LOG.debug(_('Fanout notify agent at %(topic)s the message '
|
||||
'%(method)s on router %(router_id)s'),
|
||||
{'topic': topics.L3_AGENT,
|
||||
'method': method,
|
||||
'router_id': router_id})
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
router_id=router_id),
|
||||
topic=topics.L3_AGENT)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
self._notification_host(context, 'agent_updated',
|
||||
{'admin_state_up': admin_state_up},
|
||||
host)
|
||||
|
||||
def router_deleted(self, context, router_id):
|
||||
self._notification_fanout(context, 'router_deleted', router_id)
|
||||
|
||||
def routers_updated(self, context, router_ids, operation=None, data=None):
|
||||
if router_ids:
|
||||
self._notification(context, 'routers_updated', router_ids,
|
||||
operation, data)
|
||||
|
||||
def router_removed_from_agent(self, context, router_id, host):
|
||||
self._notification_host(context, 'router_removed_from_agent',
|
||||
{'router_id': router_id}, host)
|
||||
|
||||
def router_added_to_agent(self, context, router_ids, host):
|
||||
self._notification_host(context, 'router_added_to_agent',
|
||||
router_ids, host)
|
@ -1,99 +0,0 @@
|
||||
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
|
||||
#
|
||||
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import rpc_compat
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.plugins.common import constants as service_constants
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MeteringAgentNotifyAPI(rpc_compat.RpcProxy):
|
||||
"""API for plugin to notify L3 metering agent."""
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic=topics.METERING_AGENT):
|
||||
super(MeteringAgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _agent_notification(self, context, method, routers):
|
||||
"""Notify l3 metering agents hosted by l3 agent hosts."""
|
||||
adminContext = context.is_admin and context or context.elevated()
|
||||
plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
|
||||
l3_routers = {}
|
||||
for router in routers:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
adminContext, [router['id']],
|
||||
admin_state_up=True,
|
||||
active=True)
|
||||
for l3_agent in l3_agents:
|
||||
LOG.debug(_('Notify metering agent at %(topic)s.%(host)s '
|
||||
'the message %(method)s'),
|
||||
{'topic': self.topic,
|
||||
'host': l3_agent.host,
|
||||
'method': method})
|
||||
|
||||
l3_router = l3_routers.get(l3_agent.host, [])
|
||||
l3_router.append(router)
|
||||
l3_routers[l3_agent.host] = l3_router
|
||||
|
||||
for host, routers in l3_routers.iteritems():
|
||||
self.cast(context, self.make_msg(method, routers=routers),
|
||||
topic='%s.%s' % (self.topic, host))
|
||||
|
||||
def _notification_fanout(self, context, method, router_id):
|
||||
LOG.debug(_('Fanout notify metering agent at %(topic)s the message '
|
||||
'%(method)s on router %(router_id)s'),
|
||||
{'topic': self.topic,
|
||||
'method': method,
|
||||
'router_id': router_id})
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
router_id=router_id),
|
||||
topic=self.topic)
|
||||
|
||||
def _notification(self, context, method, routers):
|
||||
"""Notify all the agents that are hosting the routers."""
|
||||
plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
service_constants.L3_ROUTER_NAT)
|
||||
if utils.is_extension_supported(
|
||||
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
|
||||
self._agent_notification(context, method, routers)
|
||||
else:
|
||||
self.fanout_cast(context, self.make_msg(method, routers=routers),
|
||||
topic=self.topic)
|
||||
|
||||
def router_deleted(self, context, router_id):
|
||||
self._notification_fanout(context, 'router_deleted', router_id)
|
||||
|
||||
def routers_updated(self, context, routers):
|
||||
if routers:
|
||||
self._notification(context, 'routers_updated', routers)
|
||||
|
||||
def update_metering_label_rules(self, context, routers):
|
||||
self._notification(context, 'update_metering_label_rules', routers)
|
||||
|
||||
def add_metering_label(self, context, routers):
|
||||
self._notification(context, 'add_metering_label', routers)
|
||||
|
||||
def remove_metering_label(self, context, routers):
|
||||
self._notification(context, 'remove_metering_label', routers)
|
Loading…
Reference in new issue