From e318a3d7cf6ae3d547bfa4e43b01b659cb0adb1b Mon Sep 17 00:00:00 2001 From: Elena Ezhova Date: Fri, 19 Jun 2015 11:24:51 +0300 Subject: [PATCH] Switch to oslo.service oslo.service has graduated, so neutron_lbaas should consume it. As neutron_lbaas.agent.agent.LbaasAgentService subclasses neutron.common.rpc.Service which in its turn derives from oslo_service.service.Service in a result we get a circular dependency between neutron and neutron_lbaas. To avoid it, neutron.common.rpc to neutron_lbaas is temporarily copied to neutron_lbaas. It will be removed after both neutron and neutron_lbaas are ported to oslo_service. Partial-Bug: #1466851 Change-Id: I2093b37d411df9a26958fa50ff523c258bbe06ec Depends-On: I305cf53bad6213c151395e93d656b53a8a28e1db --- neutron_lbaas/agent/agent.py | 6 +- neutron_lbaas/agent/agent_manager.py | 6 +- neutron_lbaas/agent/common_rpc.py | 211 ++++++++++++++++++ .../haproxy/synchronous_namespace_driver.py | 2 +- .../drivers/netscaler/netscaler_driver_v2.py | 2 +- .../services/loadbalancer/agent/agent.py | 6 +- .../loadbalancer/agent/agent_manager.py | 6 +- .../loadbalancer/drivers/embrane/poller.py | 2 +- neutron_lbaas/tests/unit/agent/test_agent.py | 2 +- .../services/loadbalancer/agent/test_agent.py | 2 +- requirements.txt | 1 + 11 files changed, 229 insertions(+), 17 deletions(-) create mode 100644 neutron_lbaas/agent/common_rpc.py diff --git a/neutron_lbaas/agent/agent.py b/neutron_lbaas/agent/agent.py index 8e0240583..d72e88db9 100644 --- a/neutron_lbaas/agent/agent.py +++ b/neutron_lbaas/agent/agent.py @@ -22,11 +22,11 @@ eventlet.monkey_patch() from neutron.agent.common import config from neutron.agent.linux import interface from neutron.common import config as common_config -from neutron.common import rpc as n_rpc -from neutron.openstack.common import service from oslo_config import cfg +from oslo_service import service from neutron_lbaas.agent import agent_manager as manager +from neutron_lbaas.agent import common_rpc as n_rpc from neutron_lbaas.services.loadbalancer import constants OPTS = [ @@ -67,4 +67,4 @@ def main(): topic=constants.LOADBALANCER_AGENTV2, manager=mgr ) - service.launch(svc).wait() + service.launch(cfg.CONF, svc).wait() diff --git a/neutron_lbaas/agent/agent_manager.py b/neutron_lbaas/agent/agent_manager.py index 044089815..a060c95df 100644 --- a/neutron_lbaas/agent/agent_manager.py +++ b/neutron_lbaas/agent/agent_manager.py @@ -17,13 +17,13 @@ from neutron.agent import rpc as agent_rpc from neutron.common import exceptions as n_exc from neutron import context as ncontext from neutron.i18n import _LE, _LI -from neutron.openstack.common import loopingcall -from neutron.openstack.common import periodic_task from neutron.plugins.common import constants from neutron.services import provider_configuration as provconfig from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall +from oslo_service import periodic_task from oslo_utils import importutils from neutron_lbaas.agent import agent_api @@ -56,7 +56,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): target = oslo_messaging.Target(version='1.0') def __init__(self, conf): - super(LbaasAgentManager, self).__init__() + super(LbaasAgentManager, self).__init__(conf) self.conf = conf self.context = ncontext.get_admin_context_without_session() self.serializer = agent_driver_base.DataModelSerializer() diff --git a/neutron_lbaas/agent/common_rpc.py b/neutron_lbaas/agent/common_rpc.py new file mode 100644 index 000000000..23f34c4e5 --- /dev/null +++ b/neutron_lbaas/agent/common_rpc.py @@ -0,0 +1,211 @@ +# Copyright (c) 2012 OpenStack Foundation. +# Copyright (c) 2014 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE(eezhova): This file is required to avoid circular dependency +# while porting neutron and neutron_lbaas to oslo_service. +# It will be removed afterwards. + +from neutron.common import exceptions +from neutron.common import rpc +from neutron import context +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging +from oslo_messaging import serializer as om_serializer +from oslo_service import service as common_service + + +LOG = logging.getLogger(__name__) + + +TRANSPORT = None +NOTIFIER = None + +ALLOWED_EXMODS = [ + exceptions.__name__, +] +EXTRA_EXMODS = [] + + +TRANSPORT_ALIASES = { + 'neutron.openstack.common.rpc.impl_fake': 'fake', + 'neutron.openstack.common.rpc.impl_qpid': 'qpid', + 'neutron.openstack.common.rpc.impl_kombu': 'rabbit', + 'neutron.openstack.common.rpc.impl_zmq': 'zmq', + 'neutron.rpc.impl_fake': 'fake', + 'neutron.rpc.impl_qpid': 'qpid', + 'neutron.rpc.impl_kombu': 'rabbit', + 'neutron.rpc.impl_zmq': 'zmq', +} + + +def init(conf): + global TRANSPORT, NOTIFIER + exmods = get_allowed_exmods() + TRANSPORT = oslo_messaging.get_transport(conf, + allowed_remote_exmods=exmods, + aliases=TRANSPORT_ALIASES) + serializer = RequestContextSerializer() + NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer) + + +def cleanup(): + global TRANSPORT, NOTIFIER + assert TRANSPORT is not None + assert NOTIFIER is not None + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None + + +def add_extra_exmods(*args): + EXTRA_EXMODS.extend(args) + + +def clear_extra_exmods(): + del EXTRA_EXMODS[:] + + +def get_allowed_exmods(): + return ALLOWED_EXMODS + EXTRA_EXMODS + + +def get_client(target, version_cap=None, serializer=None): + assert rpc.TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return oslo_messaging.RPCClient(rpc.TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + +def get_server(target, endpoints, serializer=None): + assert rpc.TRANSPORT is not None + serializer = RequestContextSerializer(serializer) + return oslo_messaging.get_rpc_server(rpc.TRANSPORT, target, endpoints, + 'eventlet', serializer) + + +def get_notifier(service=None, host=None, publisher_id=None): + assert rpc.NOTIFIER is not None + if not publisher_id: + publisher_id = "%s.%s" % (service, host or cfg.CONF.host) + return rpc.NOTIFIER.prepare(publisher_id=publisher_id) + + +class RequestContextSerializer(om_serializer.Serializer): + """This serializer is used to convert RPC common context into + Neutron Context. + """ + def __init__(self, base=None): + super(RequestContextSerializer, self).__init__() + self._base = base + + def serialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.serialize_entity(ctxt, entity) + + def deserialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.deserialize_entity(ctxt, entity) + + def serialize_context(self, ctxt): + return ctxt.to_dict() + + def deserialize_context(self, ctxt): + rpc_ctxt_dict = ctxt.copy() + user_id = rpc_ctxt_dict.pop('user_id', None) + if not user_id: + user_id = rpc_ctxt_dict.pop('user', None) + tenant_id = rpc_ctxt_dict.pop('tenant_id', None) + if not tenant_id: + tenant_id = rpc_ctxt_dict.pop('project_id', None) + return context.Context(user_id, tenant_id, **rpc_ctxt_dict) + + +class Service(common_service.Service): + """Service object for binaries running on hosts. + + A service enables rpc by listening to queues based on topic and host. + """ + def __init__(self, host, topic, manager=None, serializer=None): + super(Service, self).__init__() + self.host = host + self.topic = topic + self.serializer = serializer + self.conn = None + if manager is None: + self.manager = self + else: + self.manager = manager + + def start(self): + super(Service, self).start() + + self.conn = create_connection(new=True) + LOG.debug("Creating Consumer connection for Service %s", + self.topic) + + endpoints = [self.manager] + + self.conn.create_consumer(self.topic, endpoints) + + # Hook to allow the manager to do other initializations after + # the rpc connection is created. + if callable(getattr(self.manager, 'initialize_service_hook', None)): + self.manager.initialize_service_hook(self) + + # Consume from all consumers in threads + self.conn.consume_in_threads() + + def stop(self, graceful=False): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.conn.close() + except Exception: + pass + super(Service, self).stop(graceful) + + +class Connection(object): + + def __init__(self): + super(Connection, self).__init__() + self.servers = [] + + def create_consumer(self, topic, endpoints, fanout=False): + target = oslo_messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=fanout) + server = get_server(target, endpoints) + self.servers.append(server) + + def consume_in_threads(self): + for server in self.servers: + server.start() + return self.servers + + def close(self): + for server in self.servers: + server.stop() + for server in self.servers: + server.wait() + + +# functions +def create_connection(new=True): + return Connection() diff --git a/neutron_lbaas/drivers/haproxy/synchronous_namespace_driver.py b/neutron_lbaas/drivers/haproxy/synchronous_namespace_driver.py index 20175b2b3..05d98b6e0 100644 --- a/neutron_lbaas/drivers/haproxy/synchronous_namespace_driver.py +++ b/neutron_lbaas/drivers/haproxy/synchronous_namespace_driver.py @@ -25,11 +25,11 @@ from neutron.common import exceptions from neutron import context as ncontext from neutron.extensions import portbindings from neutron.i18n import _LE, _LW -from neutron.openstack.common import service from neutron.plugins.common import constants from oslo_config import cfg from oslo_log import helpers as log_helpers from oslo_log import log as logging +from oslo_service import service from oslo_utils import excutils from oslo_utils import importutils diff --git a/neutron_lbaas/drivers/netscaler/netscaler_driver_v2.py b/neutron_lbaas/drivers/netscaler/netscaler_driver_v2.py index 110bf21d3..bbc8a696b 100644 --- a/neutron_lbaas/drivers/netscaler/netscaler_driver_v2.py +++ b/neutron_lbaas/drivers/netscaler/netscaler_driver_v2.py @@ -21,8 +21,8 @@ from oslo_log import log as logging from neutron import context as ncontext from neutron.i18n import _LE -from neutron.openstack.common import service from neutron.plugins.common import constants +from oslo_service import service from neutron_lbaas.drivers import driver_base from neutron_lbaas.drivers.driver_mixins import BaseManagerMixin diff --git a/neutron_lbaas/services/loadbalancer/agent/agent.py b/neutron_lbaas/services/loadbalancer/agent/agent.py index 35967d4dd..229527d7a 100644 --- a/neutron_lbaas/services/loadbalancer/agent/agent.py +++ b/neutron_lbaas/services/loadbalancer/agent/agent.py @@ -20,11 +20,11 @@ eventlet.monkey_patch() from neutron.agent.common import config from neutron.agent.linux import interface from neutron.common import config as common_config -from neutron.common import rpc as n_rpc from neutron.common import topics -from neutron.openstack.common import service from oslo_config import cfg +from oslo_service import service +from neutron_lbaas.agent import common_rpc as n_rpc from neutron_lbaas.services.loadbalancer.agent import agent_manager as manager OPTS = [ @@ -64,4 +64,4 @@ def main(): topic=topics.LOADBALANCER_AGENT, manager=mgr ) - service.launch(svc).wait() + service.launch(cfg.CONF, svc).wait() diff --git a/neutron_lbaas/services/loadbalancer/agent/agent_manager.py b/neutron_lbaas/services/loadbalancer/agent/agent_manager.py index 48ea9455d..472aeef83 100644 --- a/neutron_lbaas/services/loadbalancer/agent/agent_manager.py +++ b/neutron_lbaas/services/loadbalancer/agent/agent_manager.py @@ -18,13 +18,13 @@ from neutron.common import exceptions as n_exc from neutron.common import topics from neutron import context as ncontext from neutron.i18n import _LE, _LI -from neutron.openstack.common import loopingcall -from neutron.openstack.common import periodic_task from neutron.plugins.common import constants from neutron.services import provider_configuration as provconfig from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall +from oslo_service import periodic_task from oslo_utils import importutils from neutron_lbaas.services.loadbalancer.agent import agent_api @@ -59,7 +59,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): target = oslo_messaging.Target(version='2.0') def __init__(self, conf): - super(LbaasAgentManager, self).__init__() + super(LbaasAgentManager, self).__init__(conf) self.conf = conf self.context = ncontext.get_admin_context_without_session() self.plugin_rpc = agent_api.LbaasAgentApi( diff --git a/neutron_lbaas/services/loadbalancer/drivers/embrane/poller.py b/neutron_lbaas/services/loadbalancer/drivers/embrane/poller.py index cf8d224c4..2718510bf 100644 --- a/neutron_lbaas/services/loadbalancer/drivers/embrane/poller.py +++ b/neutron_lbaas/services/loadbalancer/drivers/embrane/poller.py @@ -17,10 +17,10 @@ from heleosapi import exceptions as h_exc from neutron import context from neutron.db import servicetype_db as sdb from neutron.i18n import _LE -from neutron.openstack.common import loopingcall from neutron.plugins.common import constants as ccon from neutron.plugins.embrane.common import contexts as embrane_ctx from oslo_log import log as logging +from oslo_service import loopingcall from neutron_lbaas.db.loadbalancer import loadbalancer_db as ldb from neutron_lbaas.services.loadbalancer.drivers.embrane \ diff --git a/neutron_lbaas/tests/unit/agent/test_agent.py b/neutron_lbaas/tests/unit/agent/test_agent.py index 8e7bdc287..2f3206ffd 100644 --- a/neutron_lbaas/tests/unit/agent/test_agent.py +++ b/neutron_lbaas/tests/unit/agent/test_agent.py @@ -45,4 +45,4 @@ class TestLbaasService(base.BaseTestCase): ) as (mock_logging, mock_launch, sys_argv, mgr_cls, ro): agent.main() - mock_launch.assert_called_once_with(mock.ANY) + mock_launch.assert_called_once_with(mock.ANY, mock.ANY) diff --git a/neutron_lbaas/tests/unit/services/loadbalancer/agent/test_agent.py b/neutron_lbaas/tests/unit/services/loadbalancer/agent/test_agent.py index b92a66778..ae1e38a86 100644 --- a/neutron_lbaas/tests/unit/services/loadbalancer/agent/test_agent.py +++ b/neutron_lbaas/tests/unit/services/loadbalancer/agent/test_agent.py @@ -44,4 +44,4 @@ class TestLbaasService(base.BaseTestCase): ) as (mock_logging, mock_launch, sys_argv, mgr_cls, ro): agent.main() - mock_launch.assert_called_once_with(mock.ANY) + mock_launch.assert_called_once_with(mock.ANY, mock.ANY) diff --git a/requirements.txt b/requirements.txt index 0077df411..b0db29f29 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ oslo.db>=1.10.0 # Apache-2.0 oslo.log>=1.2.0 # Apache-2.0 oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0 oslo.serialization>=1.4.0 # Apache-2.0 +oslo.service>=0.1.0 # Apache-2.0 oslo.utils>=1.6.0 # Apache-2.0 python-barbicanclient>=3.0.1 pyasn1