Switch to oslo.service
oslo.service has graduated, so neutron_lbaas should consume it. As neutron_lbaas.agent.agent.LbaasAgentService subclasses neutron.common.rpc.Service which in its turn derives from oslo_service.service.Service in a result we get a circular dependency between neutron and neutron_lbaas. To avoid it, neutron.common.rpc to neutron_lbaas is temporarily copied to neutron_lbaas. It will be removed after both neutron and neutron_lbaas are ported to oslo_service. Partial-Bug: #1466851 Change-Id: I2093b37d411df9a26958fa50ff523c258bbe06ec Depends-On: I305cf53bad6213c151395e93d656b53a8a28e1db
This commit is contained in:
parent
67ca00a35f
commit
e318a3d7cf
@ -22,11 +22,11 @@ eventlet.monkey_patch()
|
||||
from neutron.agent.common import config
|
||||
from neutron.agent.linux import interface
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.openstack.common import service
|
||||
from oslo_config import cfg
|
||||
from oslo_service import service
|
||||
|
||||
from neutron_lbaas.agent import agent_manager as manager
|
||||
from neutron_lbaas.agent import common_rpc as n_rpc
|
||||
from neutron_lbaas.services.loadbalancer import constants
|
||||
|
||||
OPTS = [
|
||||
@ -67,4 +67,4 @@ def main():
|
||||
topic=constants.LOADBALANCER_AGENTV2,
|
||||
manager=mgr
|
||||
)
|
||||
service.launch(svc).wait()
|
||||
service.launch(cfg.CONF, svc).wait()
|
||||
|
@ -17,13 +17,13 @@ from neutron.agent import rpc as agent_rpc
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron import context as ncontext
|
||||
from neutron.i18n import _LE, _LI
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.openstack.common import periodic_task
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.services import provider_configuration as provconfig
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import importutils
|
||||
|
||||
from neutron_lbaas.agent import agent_api
|
||||
@ -56,7 +56,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
target = oslo_messaging.Target(version='1.0')
|
||||
|
||||
def __init__(self, conf):
|
||||
super(LbaasAgentManager, self).__init__()
|
||||
super(LbaasAgentManager, self).__init__(conf)
|
||||
self.conf = conf
|
||||
self.context = ncontext.get_admin_context_without_session()
|
||||
self.serializer = agent_driver_base.DataModelSerializer()
|
||||
|
211
neutron_lbaas/agent/common_rpc.py
Normal file
211
neutron_lbaas/agent/common_rpc.py
Normal file
@ -0,0 +1,211 @@
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# NOTE(eezhova): This file is required to avoid circular dependency
|
||||
# while porting neutron and neutron_lbaas to oslo_service.
|
||||
# It will be removed afterwards.
|
||||
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import rpc
|
||||
from neutron import context
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_messaging import serializer as om_serializer
|
||||
from oslo_service import service as common_service
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
ALLOWED_EXMODS = [
|
||||
exceptions.__name__,
|
||||
]
|
||||
EXTRA_EXMODS = []
|
||||
|
||||
|
||||
TRANSPORT_ALIASES = {
|
||||
'neutron.openstack.common.rpc.impl_fake': 'fake',
|
||||
'neutron.openstack.common.rpc.impl_qpid': 'qpid',
|
||||
'neutron.openstack.common.rpc.impl_kombu': 'rabbit',
|
||||
'neutron.openstack.common.rpc.impl_zmq': 'zmq',
|
||||
'neutron.rpc.impl_fake': 'fake',
|
||||
'neutron.rpc.impl_qpid': 'qpid',
|
||||
'neutron.rpc.impl_kombu': 'rabbit',
|
||||
'neutron.rpc.impl_zmq': 'zmq',
|
||||
}
|
||||
|
||||
|
||||
def init(conf):
|
||||
global TRANSPORT, NOTIFIER
|
||||
exmods = get_allowed_exmods()
|
||||
TRANSPORT = oslo_messaging.get_transport(conf,
|
||||
allowed_remote_exmods=exmods,
|
||||
aliases=TRANSPORT_ALIASES)
|
||||
serializer = RequestContextSerializer()
|
||||
NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer)
|
||||
|
||||
|
||||
def cleanup():
|
||||
global TRANSPORT, NOTIFIER
|
||||
assert TRANSPORT is not None
|
||||
assert NOTIFIER is not None
|
||||
TRANSPORT.cleanup()
|
||||
TRANSPORT = NOTIFIER = None
|
||||
|
||||
|
||||
def add_extra_exmods(*args):
|
||||
EXTRA_EXMODS.extend(args)
|
||||
|
||||
|
||||
def clear_extra_exmods():
|
||||
del EXTRA_EXMODS[:]
|
||||
|
||||
|
||||
def get_allowed_exmods():
|
||||
return ALLOWED_EXMODS + EXTRA_EXMODS
|
||||
|
||||
|
||||
def get_client(target, version_cap=None, serializer=None):
|
||||
assert rpc.TRANSPORT is not None
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
return oslo_messaging.RPCClient(rpc.TRANSPORT,
|
||||
target,
|
||||
version_cap=version_cap,
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
def get_server(target, endpoints, serializer=None):
|
||||
assert rpc.TRANSPORT is not None
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
return oslo_messaging.get_rpc_server(rpc.TRANSPORT, target, endpoints,
|
||||
'eventlet', serializer)
|
||||
|
||||
|
||||
def get_notifier(service=None, host=None, publisher_id=None):
|
||||
assert rpc.NOTIFIER is not None
|
||||
if not publisher_id:
|
||||
publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
|
||||
return rpc.NOTIFIER.prepare(publisher_id=publisher_id)
|
||||
|
||||
|
||||
class RequestContextSerializer(om_serializer.Serializer):
|
||||
"""This serializer is used to convert RPC common context into
|
||||
Neutron Context.
|
||||
"""
|
||||
def __init__(self, base=None):
|
||||
super(RequestContextSerializer, self).__init__()
|
||||
self._base = base
|
||||
|
||||
def serialize_entity(self, ctxt, entity):
|
||||
if not self._base:
|
||||
return entity
|
||||
return self._base.serialize_entity(ctxt, entity)
|
||||
|
||||
def deserialize_entity(self, ctxt, entity):
|
||||
if not self._base:
|
||||
return entity
|
||||
return self._base.deserialize_entity(ctxt, entity)
|
||||
|
||||
def serialize_context(self, ctxt):
|
||||
return ctxt.to_dict()
|
||||
|
||||
def deserialize_context(self, ctxt):
|
||||
rpc_ctxt_dict = ctxt.copy()
|
||||
user_id = rpc_ctxt_dict.pop('user_id', None)
|
||||
if not user_id:
|
||||
user_id = rpc_ctxt_dict.pop('user', None)
|
||||
tenant_id = rpc_ctxt_dict.pop('tenant_id', None)
|
||||
if not tenant_id:
|
||||
tenant_id = rpc_ctxt_dict.pop('project_id', None)
|
||||
return context.Context(user_id, tenant_id, **rpc_ctxt_dict)
|
||||
|
||||
|
||||
class Service(common_service.Service):
|
||||
"""Service object for binaries running on hosts.
|
||||
|
||||
A service enables rpc by listening to queues based on topic and host.
|
||||
"""
|
||||
def __init__(self, host, topic, manager=None, serializer=None):
|
||||
super(Service, self).__init__()
|
||||
self.host = host
|
||||
self.topic = topic
|
||||
self.serializer = serializer
|
||||
self.conn = None
|
||||
if manager is None:
|
||||
self.manager = self
|
||||
else:
|
||||
self.manager = manager
|
||||
|
||||
def start(self):
|
||||
super(Service, self).start()
|
||||
|
||||
self.conn = create_connection(new=True)
|
||||
LOG.debug("Creating Consumer connection for Service %s",
|
||||
self.topic)
|
||||
|
||||
endpoints = [self.manager]
|
||||
|
||||
self.conn.create_consumer(self.topic, endpoints)
|
||||
|
||||
# Hook to allow the manager to do other initializations after
|
||||
# the rpc connection is created.
|
||||
if callable(getattr(self.manager, 'initialize_service_hook', None)):
|
||||
self.manager.initialize_service_hook(self)
|
||||
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def stop(self, graceful=False):
|
||||
# Try to shut the connection down, but if we get any sort of
|
||||
# errors, go ahead and ignore them.. as we're shutting down anyway
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
super(Service, self).stop(graceful)
|
||||
|
||||
|
||||
class Connection(object):
|
||||
|
||||
def __init__(self):
|
||||
super(Connection, self).__init__()
|
||||
self.servers = []
|
||||
|
||||
def create_consumer(self, topic, endpoints, fanout=False):
|
||||
target = oslo_messaging.Target(
|
||||
topic=topic, server=cfg.CONF.host, fanout=fanout)
|
||||
server = get_server(target, endpoints)
|
||||
self.servers.append(server)
|
||||
|
||||
def consume_in_threads(self):
|
||||
for server in self.servers:
|
||||
server.start()
|
||||
return self.servers
|
||||
|
||||
def close(self):
|
||||
for server in self.servers:
|
||||
server.stop()
|
||||
for server in self.servers:
|
||||
server.wait()
|
||||
|
||||
|
||||
# functions
|
||||
def create_connection(new=True):
|
||||
return Connection()
|
@ -25,11 +25,11 @@ from neutron.common import exceptions
|
||||
from neutron import context as ncontext
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.i18n import _LE, _LW
|
||||
from neutron.openstack.common import service
|
||||
from neutron.plugins.common import constants
|
||||
from oslo_config import cfg
|
||||
from oslo_log import helpers as log_helpers
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import service
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import importutils
|
||||
|
||||
|
@ -21,8 +21,8 @@ from oslo_log import log as logging
|
||||
|
||||
from neutron import context as ncontext
|
||||
from neutron.i18n import _LE
|
||||
from neutron.openstack.common import service
|
||||
from neutron.plugins.common import constants
|
||||
from oslo_service import service
|
||||
|
||||
from neutron_lbaas.drivers import driver_base
|
||||
from neutron_lbaas.drivers.driver_mixins import BaseManagerMixin
|
||||
|
@ -20,11 +20,11 @@ eventlet.monkey_patch()
|
||||
from neutron.agent.common import config
|
||||
from neutron.agent.linux import interface
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.openstack.common import service
|
||||
from oslo_config import cfg
|
||||
from oslo_service import service
|
||||
|
||||
from neutron_lbaas.agent import common_rpc as n_rpc
|
||||
from neutron_lbaas.services.loadbalancer.agent import agent_manager as manager
|
||||
|
||||
OPTS = [
|
||||
@ -64,4 +64,4 @@ def main():
|
||||
topic=topics.LOADBALANCER_AGENT,
|
||||
manager=mgr
|
||||
)
|
||||
service.launch(svc).wait()
|
||||
service.launch(cfg.CONF, svc).wait()
|
||||
|
@ -18,13 +18,13 @@ from neutron.common import exceptions as n_exc
|
||||
from neutron.common import topics
|
||||
from neutron import context as ncontext
|
||||
from neutron.i18n import _LE, _LI
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.openstack.common import periodic_task
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.services import provider_configuration as provconfig
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import importutils
|
||||
|
||||
from neutron_lbaas.services.loadbalancer.agent import agent_api
|
||||
@ -59,7 +59,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
target = oslo_messaging.Target(version='2.0')
|
||||
|
||||
def __init__(self, conf):
|
||||
super(LbaasAgentManager, self).__init__()
|
||||
super(LbaasAgentManager, self).__init__(conf)
|
||||
self.conf = conf
|
||||
self.context = ncontext.get_admin_context_without_session()
|
||||
self.plugin_rpc = agent_api.LbaasAgentApi(
|
||||
|
@ -17,10 +17,10 @@ from heleosapi import exceptions as h_exc
|
||||
from neutron import context
|
||||
from neutron.db import servicetype_db as sdb
|
||||
from neutron.i18n import _LE
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.plugins.common import constants as ccon
|
||||
from neutron.plugins.embrane.common import contexts as embrane_ctx
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
|
||||
from neutron_lbaas.db.loadbalancer import loadbalancer_db as ldb
|
||||
from neutron_lbaas.services.loadbalancer.drivers.embrane \
|
||||
|
@ -45,4 +45,4 @@ class TestLbaasService(base.BaseTestCase):
|
||||
) as (mock_logging, mock_launch, sys_argv, mgr_cls, ro):
|
||||
agent.main()
|
||||
|
||||
mock_launch.assert_called_once_with(mock.ANY)
|
||||
mock_launch.assert_called_once_with(mock.ANY, mock.ANY)
|
||||
|
@ -44,4 +44,4 @@ class TestLbaasService(base.BaseTestCase):
|
||||
) as (mock_logging, mock_launch, sys_argv, mgr_cls, ro):
|
||||
agent.main()
|
||||
|
||||
mock_launch.assert_called_once_with(mock.ANY)
|
||||
mock_launch.assert_called_once_with(mock.ANY, mock.ANY)
|
||||
|
@ -14,6 +14,7 @@ oslo.db>=1.10.0 # Apache-2.0
|
||||
oslo.log>=1.2.0 # Apache-2.0
|
||||
oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0
|
||||
oslo.serialization>=1.4.0 # Apache-2.0
|
||||
oslo.service>=0.1.0 # Apache-2.0
|
||||
oslo.utils>=1.6.0 # Apache-2.0
|
||||
python-barbicanclient>=3.0.1
|
||||
pyasn1
|
||||
|
Loading…
Reference in New Issue
Block a user