Merge "Fix Metering doesn't respect the l3 agent binding"

This commit is contained in:
Jenkins 2014-04-23 16:05:39 +00:00 committed by Gerrit Code Review
commit 1090267c0b
7 changed files with 235 additions and 65 deletions

View File

@ -20,6 +20,7 @@ from neutron.common import utils
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import proxy
from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
@ -35,7 +36,8 @@ class MeteringAgentNotifyAPI(proxy.RpcProxy):
def _agent_notification(self, context, method, routers):
"""Notify l3 metering agents hosted by l3 agent hosts."""
adminContext = context.is_admin and context or context.elevated()
plugin = manager.NeutronManager.get_plugin()
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
l3_routers = {}
for router in routers:
@ -71,7 +73,8 @@ class MeteringAgentNotifyAPI(proxy.RpcProxy):
def _notification(self, context, method, routers):
"""Notify all the agents that are hosting the routers."""
plugin = manager.NeutronManager.get_plugin()
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if utils.is_extension_supported(
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
self._agent_notification(context, method, routers)

View File

@ -227,12 +227,13 @@ class MeteringDbMixin(metering.MeteringPluginBase,
return routers_dict.values()
def get_sync_data_metering(self, context, label_id=None):
with context.session.begin(subtransactions=True):
if label_id:
label = self._get_by_id(context, MeteringLabel, label_id)
labels = [label]
else:
labels = self._get_collection_query(context, MeteringLabel)
def get_sync_data_metering(self, context, label_id=None, router_ids=None):
labels = context.session.query(MeteringLabel)
if label_id:
labels = labels.filter(MeteringLabel.id == label_id)
elif router_ids:
labels = (labels.join(MeteringLabel.routers).
filter(l3_db.Router.id.in_(router_ids)))
return self._process_sync_metering_data(labels)

View File

@ -0,0 +1,59 @@
# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
#
# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import constants as consts
from neutron.common import rpc as p_rpc
from neutron.common import utils
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
class MeteringRpcCallbacks(object):
RPC_API_VERSION = '1.0'
def __init__(self, meter_plugin):
self.meter_plugin = meter_plugin
def create_rpc_dispatcher(self):
return p_rpc.PluginRpcDispatcher([self])
def get_sync_data_metering(self, context, **kwargs):
l3_plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if not l3_plugin:
return
host = kwargs.get('host')
if not utils.is_extension_supported(
l3_plugin, consts.L3_AGENT_SCHEDULER_EXT_ALIAS) or not host:
return self.meter_plugin.get_sync_data_metering(context)
else:
agents = l3_plugin.get_l3_agents(context, filters={'host': [host]})
if not agents:
LOG.error(_('Unable to find agent %s.'), host)
return
routers = l3_plugin.list_routers_on_l3_agent(context, agents[0].id)
router_ids = [router['id'] for router in routers['routers']]
if not router_ids:
return
return self.meter_plugin.get_sync_data_metering(context,
router_ids=router_ids)

View File

@ -88,7 +88,7 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager):
self.label_tenant_id = {}
self.routers = {}
self.metering_infos = {}
super(MeteringAgent, self).__init__(host=self.conf.host)
super(MeteringAgent, self).__init__(host=host)
def _load_drivers(self):
"""Loads plugin-driver from configuration."""

View File

@ -15,26 +15,12 @@
# under the License.
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import rpc as p_rpc
from neutron.common import topics
from neutron.db.metering import metering_db
from neutron.db.metering import metering_rpc
from neutron.openstack.common import rpc
class MeteringCallbacks(metering_db.MeteringDbMixin):
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return p_rpc.PluginRpcDispatcher([self])
def get_sync_data_metering(self, context, **kwargs):
return super(MeteringCallbacks, self).get_sync_data_metering(context)
class MeteringPlugin(metering_db.MeteringDbMixin):
"""Implementation of the Neutron Metering Service Plugin."""
supported_extension_aliases = ["metering"]
@ -42,7 +28,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
def __init__(self):
super(MeteringPlugin, self).__init__()
self.callbacks = MeteringCallbacks(self)
self.callbacks = metering_rpc.MeteringRpcCallbacks(self)
self.conn = rpc.create_connection(new=True)
self.conn.create_consumer(

View File

@ -17,11 +17,16 @@
import mock
from neutron.api.v2 import attributes as attr
from neutron.common import constants as n_constants
from neutron.common import topics
from neutron import context
from neutron.db import agents_db
from neutron.db import l3_agentschedulers_db
from neutron.db.metering import metering_rpc
from neutron.extensions import l3 as ext_l3
from neutron.extensions import metering as ext_metering
from neutron import manager
from neutron.openstack.common import timeutils
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
from neutron.tests.unit.db.metering import test_db_metering
@ -31,7 +36,7 @@ from neutron.tests.unit import test_l3_plugin
_uuid = uuidutils.generate_uuid
DB_METERING_PLUGIN_KLASS = (
METERING_SERVICE_PLUGIN_KLASS = (
"neutron.services.metering."
"metering_plugin.MeteringPlugin"
)
@ -65,8 +70,9 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
)
def setUp(self):
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
plugin = 'neutron.tests.unit.test_l3_plugin.TestL3NatIntPlugin'
service_plugins = {'metering_plugin_name':
METERING_SERVICE_PLUGIN_KLASS}
ext_mgr = MeteringTestExtensionManager()
super(TestMeteringPlugin, self).setUp(plugin=plugin, ext_mgr=ext_mgr,
service_plugins=service_plugins)
@ -251,12 +257,8 @@ class TestMeteringPlugin(test_db_plugin.NeutronDbPluginV2TestCase,
self.assertEqual(tenant_id, router['router']['tenant_id'])
class TestRouteIntPlugin(l3_agentschedulers_db.L3AgentSchedulerDbMixin,
test_l3_plugin.TestL3NatIntPlugin):
supported_extension_aliases = ["router", "l3_agent_scheduler"]
class TestMeteringPluginL3AgentScheduler(
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
test_db_plugin.NeutronDbPluginV2TestCase,
test_l3_plugin.L3NatTestCaseMixin,
test_db_metering.MeteringPluginDbTestCaseMixin):
@ -266,10 +268,18 @@ class TestMeteringPluginL3AgentScheduler(
for k in ext_metering.RESOURCE_ATTRIBUTE_MAP.keys()
)
def setUp(self):
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
plugin_str = ('neutron.tests.unit.services.metering.'
'test_metering_plugin.TestRouteIntPlugin')
def setUp(self, plugin_str=None, service_plugins=None, scheduler=None):
if not plugin_str:
plugin_str = ('neutron.tests.unit.test_l3_plugin.'
'TestL3NatIntAgentSchedulingPlugin')
if not service_plugins:
service_plugins = {'metering_plugin_name':
METERING_SERVICE_PLUGIN_KLASS}
if not scheduler:
scheduler = plugin_str
ext_mgr = MeteringTestExtensionManager()
super(TestMeteringPluginL3AgentScheduler,
self).setUp(plugin=plugin_str, ext_mgr=ext_mgr,
@ -291,7 +301,7 @@ class TestMeteringPluginL3AgentScheduler(
return_value=self.ctx)
self.mock_context = self.context_patch.start()
self.l3routers_patch = mock.patch(plugin_str +
self.l3routers_patch = mock.patch(scheduler +
'.get_l3_agents_hosting_routers')
self.l3routers_mock = self.l3routers_patch.start()
@ -299,30 +309,40 @@ class TestMeteringPluginL3AgentScheduler(
def test_add_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
expected = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid},
{'status': 'ACTIVE',
'name': 'router2',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': second_uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected1 = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router1',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': self.uuid}]},
'namespace': None,
'method': 'add_metering_label'}
expected2 = {'args': {'routers': [{'status': 'ACTIVE',
'name': 'router2',
'gw_port_id': None,
'admin_state_up': True,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rules': [],
'id': second_uuid}],
'id': second_uuid}]},
'namespace': None,
'method': 'add_metering_label'}
agent_host = 'l3_agent_host'
agent = agents_db.Agent(host=agent_host)
self.l3routers_mock.return_value = [agent]
# bind each router to a specific agent
agent1 = agents_db.Agent(host='agent1')
agent2 = agents_db.Agent(host='agent2')
agents = {self.uuid: agent1,
second_uuid: agent2}
def side_effect(context, routers, admin_state_up, active):
return [agents[routers[0]]]
self.l3routers_mock.side_effect = side_effect
with self.router(name='router1', tenant_id=self.tenant_id,
set_context=True):
@ -331,7 +351,99 @@ class TestMeteringPluginL3AgentScheduler(
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
topic = "%s.%s" % (self.topic, agent_host)
self.mock_cast.assert_called_with(self.ctx,
expected,
topic=topic)
topic1 = "%s.%s" % (self.topic, 'agent1')
topic2 = "%s.%s" % (self.topic, 'agent2')
# check if there is a call per agent
expected = [mock.call(self.ctx, expected1, topic=topic1),
mock.call(self.ctx, expected2, topic=topic2)]
self.mock_cast.assert_has_calls(expected, any_order=True)
class TestMeteringPluginL3AgentSchedulerServicePlugin(
TestMeteringPluginL3AgentScheduler):
"""Unit tests for the case where separate service plugin
implements L3 routing.
"""
def setUp(self):
l3_plugin = ('neutron.tests.unit.test_l3_plugin.'
'TestL3NatAgentSchedulingServicePlugin')
service_plugins = {'metering_plugin_name':
METERING_SERVICE_PLUGIN_KLASS,
'l3_plugin_name': l3_plugin}
plugin_str = ('neutron.tests.unit.test_l3_plugin.'
'TestNoL3NatPlugin')
super(TestMeteringPluginL3AgentSchedulerServicePlugin, self).setUp(
plugin_str=plugin_str, service_plugins=service_plugins,
scheduler=l3_plugin)
class TestMeteringPluginRpcFromL3Agent(
test_db_plugin.NeutronDbPluginV2TestCase,
test_l3_plugin.L3NatTestCaseMixin,
test_db_metering.MeteringPluginDbTestCaseMixin):
resource_prefix_map = dict(
(k.replace('_', '-'), constants.COMMON_PREFIXES[constants.METERING])
for k in ext_metering.RESOURCE_ATTRIBUTE_MAP
)
def setUp(self):
service_plugins = {'metering_plugin_name':
METERING_SERVICE_PLUGIN_KLASS}
plugin = ('neutron.tests.unit.test_l3_plugin.'
'TestL3NatIntAgentSchedulingPlugin')
ext_mgr = MeteringTestExtensionManager()
super(TestMeteringPluginRpcFromL3Agent,
self).setUp(plugin=plugin, service_plugins=service_plugins,
ext_mgr=ext_mgr)
self.meter_plugin = manager.NeutronManager.get_service_plugins().get(
constants.METERING)
self.adminContext = context.get_admin_context()
self._register_l3_agent('agent1')
def _register_l3_agent(self, host):
agent = {
'binary': 'neutron-l3-agent',
'host': host,
'topic': topics.L3_AGENT,
'configurations': {},
'agent_type': n_constants.AGENT_TYPE_L3,
'start_flag': True
}
callback = agents_db.AgentExtRpcCallback()
callback.report_state(self.adminContext,
agent_state={'agent_state': agent},
time=timeutils.strtime())
def test_get_sync_data_metering(self):
with self.subnet() as subnet:
s = subnet['subnet']
self._set_net_external(s['network_id'])
with self.router(name='router1', subnet=subnet) as router:
r = router['router']
self._add_external_gateway_to_router(r['id'], s['network_id'])
with self.metering_label(tenant_id=r['tenant_id']):
callbacks = metering_rpc.MeteringRpcCallbacks(
self.meter_plugin)
data = callbacks.get_sync_data_metering(self.adminContext,
host='agent1')
self.assertEqual('router1', data[0]['name'])
self._register_l3_agent('agent2')
data = callbacks.get_sync_data_metering(self.adminContext,
host='agent2')
self.assertFalse(data)
self._remove_external_gateway_from_router(
r['id'], s['network_id'])

View File

@ -298,6 +298,15 @@ class TestL3NatServicePlugin(db_base_plugin_v2.CommonDbMixin,
return "L3 Routing Service Plugin for testing"
# A L3 routing with L3 agent scheduling service plugin class for tests with
# plugins that delegate away L3 routing functionality
class TestL3NatAgentSchedulingServicePlugin(TestL3NatServicePlugin,
l3_agentschedulers_db.
L3AgentSchedulerDbMixin):
supported_extension_aliases = ["router", "l3_agent_scheduler"]
class L3NatTestCaseMixin(object):
def _create_router(self, fmt, tenant_id, name=None,