Add scheduling feature basing on agent management extension
3rd part of blueprint quantum-scheduler 1. Allow networks to be hosted by certain dhcp agents. Network to dhcp agent is a many to many relationship. Provide a simple scheduler to schedule a network randomly to an active dhcp agent when a network or port is created. 2. Allow admin user to (de)schedule network to a certain dhcp agent manually. 3. Allow routers to be hosted by a certain l3 agent. Router to l3 agent is a many to one relationship. Provide a simple scheduler to schedule a router to l3 agent if the router is not scheduled when the router is updated. 4. Auto schedule networks and routers to agents when agents start. 5. Only support ovs plugin at this point Change-Id: Iddec3ea9d4c0fe2d51a59f7db47145722fc5a1cd
This commit is contained in:
@@ -13,7 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from quantum.common import constants
|
||||
from quantum.common import topics
|
||||
from quantum.common import utils
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
|
||||
@@ -40,11 +43,35 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
super(DhcpAgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _notification(self, context, method, payload):
|
||||
def _get_dhcp_agents(self, context, network_id):
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
dhcp_agents = plugin.get_dhcp_agents_hosting_networks(
|
||||
context, [network_id], active=True)
|
||||
return [(dhcp_agent.host, dhcp_agent.topic) for
|
||||
dhcp_agent in dhcp_agents]
|
||||
|
||||
def _notification_host(self, context, method, payload, host):
|
||||
"""Notify the agent on host"""
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topics.DHCP_AGENT, host))
|
||||
|
||||
def _notification(self, context, method, payload, network_id):
|
||||
"""Notify all the agents that are hosting the network"""
|
||||
# By now, we have no scheduling feature, so we fanout
|
||||
# to all of the DHCP agents
|
||||
self._notification_fanout(context, method, payload)
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
if (method != 'network_delete_end' and utils.is_extension_supported(
|
||||
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS)):
|
||||
for (host, topic) in self._get_dhcp_agents(context, network_id):
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topic, host))
|
||||
else:
|
||||
# besides the non-agentscheduler plugin,
|
||||
# There is no way to query who is hosting the network
|
||||
# when the network is deleted, so we need to fanout
|
||||
self._notification_fanout(context, method, payload)
|
||||
|
||||
def _notification_fanout(self, context, method, payload):
|
||||
"""Fanout the payload to all dhcp agents"""
|
||||
@@ -53,6 +80,19 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
payload=payload),
|
||||
topic=topics.DHCP_AGENT)
|
||||
|
||||
def network_removed_from_agent(self, context, network_id, host):
|
||||
self._notification_host(context, 'network_delete_end',
|
||||
{'network_id': network_id}, host)
|
||||
|
||||
def network_added_to_agent(self, context, network_id, host):
|
||||
self._notification_host(context, 'network_create_end',
|
||||
{'network': {'id': network_id}}, host)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
self._notification_host(context, 'agent_updated',
|
||||
{'admin_state_up': admin_state_up},
|
||||
host)
|
||||
|
||||
def notify(self, context, data, methodname):
|
||||
# data is {'key' : 'value'} with only one key
|
||||
if methodname not in self.VALID_METHOD_NAMES:
|
||||
@@ -61,10 +101,18 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy):
|
||||
if obj_type not in self.VALID_RESOURCES:
|
||||
return
|
||||
obj_value = data[obj_type]
|
||||
network_id = None
|
||||
if obj_type == 'network' and 'id' in obj_value:
|
||||
network_id = obj_value['id']
|
||||
elif obj_type in ['port', 'subnet'] and 'network_id' in obj_value:
|
||||
network_id = obj_value['network_id']
|
||||
if not network_id:
|
||||
return
|
||||
methodname = methodname.replace(".", "_")
|
||||
if methodname.endswith("_delete_end"):
|
||||
if 'id' in obj_value:
|
||||
self._notification(context, methodname,
|
||||
{obj_type + '_id': obj_value['id']})
|
||||
{obj_type + '_id': obj_value['id']},
|
||||
network_id)
|
||||
else:
|
||||
self._notification(context, methodname, data)
|
||||
self._notification(context, methodname, data, network_id)
|
||||
|
||||
120
quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py
Normal file
120
quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py
Normal file
@@ -0,0 +1,120 @@
|
||||
# Copyright (c) 2013 OpenStack, LLC.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from quantum.common import constants
|
||||
from quantum.common import topics
|
||||
from quantum.common import utils
|
||||
from quantum import manager
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common.rpc import proxy
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class L3AgentNotifyAPI(proxy.RpcProxy):
|
||||
"""API for plugin to notify L3 agent."""
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic=topics.L3_AGENT):
|
||||
super(L3AgentNotifyAPI, self).__init__(
|
||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def _notification_host(self, context, method, payload, host):
|
||||
"""Notify the agent that is hosting the router"""
|
||||
LOG.debug(_('Nofity agent at %(host)s the message '
|
||||
'%(method)s'), {'host': host,
|
||||
'method': method})
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
payload=payload),
|
||||
topic='%s.%s' % (topics.L3_AGENT, host))
|
||||
|
||||
def _agent_notification(self, context, method, routers,
|
||||
operation, data):
|
||||
"""Notify changed routers to hosting l3 agents.
|
||||
|
||||
Adjust routers according to l3 agents' role and
|
||||
related dhcp agents.
|
||||
Notify dhcp agent to get right subnet's gateway ips.
|
||||
"""
|
||||
adminContext = context.is_admin and context or context.elevated()
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
for router in routers:
|
||||
l3_agents = plugin.get_l3_agents_hosting_routers(
|
||||
adminContext, [router['id']],
|
||||
admin_state_up=True,
|
||||
active=True)
|
||||
for l3_agent in l3_agents:
|
||||
LOG.debug(_('Notify agent at %(topic)s.%(host)s the message '
|
||||
'%(method)s'),
|
||||
{'topic': l3_agent.topic,
|
||||
'host': l3_agent.host,
|
||||
'method': method})
|
||||
self.cast(
|
||||
context, self.make_msg(method,
|
||||
routers=[router]),
|
||||
topic='%s.%s' % (l3_agent.topic, l3_agent.host))
|
||||
|
||||
def _notification(self, context, method, routers, operation, data):
|
||||
"""Notify all the agents that are hosting the routers"""
|
||||
plugin = manager.QuantumManager.get_plugin()
|
||||
if utils.is_extension_supported(
|
||||
plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
|
||||
adminContext = (context.is_admin and
|
||||
context or context.elevated())
|
||||
plugin.schedule_routers(adminContext, routers)
|
||||
self._agent_notification(
|
||||
context, method, routers, operation, data)
|
||||
else:
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
routers=routers),
|
||||
topic=topics.L3_AGENT)
|
||||
|
||||
def _notification_fanout(self, context, method, router_id):
|
||||
"""Fanout the deleted router to all L3 agents"""
|
||||
LOG.debug(_('Fanout notify agent at %(topic)s the message '
|
||||
'%(method)s on router %(router_id)s'),
|
||||
{'topic': topics.DHCP_AGENT,
|
||||
'method': method,
|
||||
'router_id': router_id})
|
||||
self.fanout_cast(
|
||||
context, self.make_msg(method,
|
||||
router_id=router_id),
|
||||
topic=topics.L3_AGENT)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
self._notification_host(context, 'agent_updated',
|
||||
{'admin_state_up': admin_state_up},
|
||||
host)
|
||||
|
||||
def router_deleted(self, context, router_id):
|
||||
self._notification_fanout(context, 'router_deleted', router_id)
|
||||
|
||||
def routers_updated(self, context, routers, operation=None, data=None):
|
||||
if routers:
|
||||
self._notification(context, 'routers_updated', routers,
|
||||
operation, data)
|
||||
|
||||
def router_removed_from_agent(self, context, router_id, host):
|
||||
self._notification_host(context, 'router_removed_from_agent',
|
||||
{'router_id': router_id}, host)
|
||||
|
||||
def router_added_to_agent(self, context, routers, host):
|
||||
self._notification_host(context, 'router_added_to_agent',
|
||||
routers, host)
|
||||
|
||||
L3AgentNotify = L3AgentNotifyAPI()
|
||||
Reference in New Issue
Block a user