Improve the rpc module

1. Removed the rpc module and replaced it with messaging.
2. Simplified the service part and RPCHook for api.

Change-Id: Ia1c09081db5ae058cd4b39e083cf28a97130c48f
This commit is contained in:
zhangguoqing
2016-08-19 08:36:47 +00:00
parent 59ce21c3c1
commit 3ffa3fabd8
12 changed files with 127 additions and 112 deletions

View File

@@ -27,7 +27,6 @@ import pecan
from cloudkitty.api import config as api_config
from cloudkitty.api import hooks
from cloudkitty.i18n import _LI
from cloudkitty import rpc
from cloudkitty import storage
@@ -71,13 +70,9 @@ def get_pecan_config():
def setup_app(pecan_config=None, extra_hooks=None):
app_conf = get_pecan_config()
client = rpc.get_client()
storage_backend = storage.get_storage()
app_hooks = [
hooks.RPCHook(client),
hooks.RPCHook(),
hooks.StorageHook(storage_backend),
]

View File

@@ -19,11 +19,12 @@ from oslo_context import context
from pecan import hooks
from cloudkitty.common import policy
from cloudkitty import messaging
class RPCHook(hooks.PecanHook):
def __init__(self, rcp_client):
self._rpc_client = rcp_client
def __init__(self):
self._rpc_client = messaging.get_client()
def before(self, state):
state.request.rpc_client = self._rpc_client

View File

@@ -16,13 +16,11 @@
# @author: Stéphane Albert
#
from cloudkitty.api import app
from cloudkitty.common import rpc
from cloudkitty import service
def main():
service.prepare_service()
rpc.init()
server = app.build_server()
try:
server.serve_forever()

View File

@@ -15,14 +15,12 @@
#
# @author: Stéphane Albert
#
from cloudkitty.common import rpc
from cloudkitty import orchestrator
from cloudkitty import service
def main():
service.prepare_service()
rpc.init()
processor = orchestrator.Orchestrator()
try:
processor.process()

View File

@@ -1,43 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Stéphane Albert
#
from oslo_config import cfg
import oslo_messaging as messaging
TRANSPORT = None
def init():
global TRANSPORT
if not TRANSPORT:
TRANSPORT = messaging.get_transport(cfg.CONF)
return TRANSPORT
def get_client(target, version_cap=None):
assert TRANSPORT is not None
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap)
def get_server(target, endpoints):
assert TRANSPORT is not None
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet')

76
cloudkitty/messaging.py Normal file
View File

@@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# Copyright 2016 99Cloud zhangguoqing <zhang.guoqing@99cloud.net>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
import oslo_messaging
DEFAULT_URL = "__default__"
RPC_TARGET = None
TRANSPORTS = {}
def setup():
oslo_messaging.set_transport_defaults('cloudkitty')
def get_transport(url=None, optional=False, cache=True):
"""Initialise the oslo_messaging layer."""
global TRANSPORTS, DEFAULT_URL
cache_key = url or DEFAULT_URL
transport = TRANSPORTS.get(cache_key)
if not transport or not cache:
try:
transport = oslo_messaging.get_transport(cfg.CONF, url)
except (oslo_messaging.InvalidTransportURL,
oslo_messaging.DriverLoadFailure):
if not optional or url:
# NOTE(sileht): oslo_messaging is configured but unloadable
# so reraise the exception
raise
return None
else:
if cache:
TRANSPORTS[cache_key] = transport
return transport
def get_target():
global RPC_TARGET
if RPC_TARGET is None:
RPC_TARGET = oslo_messaging.Target(topic='cloudkitty', version='1.0')
return RPC_TARGET
def get_client(version_cap=None):
transport = get_transport()
target = get_target()
return oslo_messaging.RPCClient(transport, target, version_cap=version_cap)
def get_server(target=None, endpoints=None):
transport = get_transport()
if not target:
target = get_target()
return oslo_messaging.get_rpc_server(transport, target,
endpoints, executor='eventlet')
def cleanup():
"""Cleanup the oslo_messaging layer."""
global TRANSPORTS, NOTIFIERS
NOTIFIERS = {}
for url in TRANSPORTS:
TRANSPORTS[url].cleanup()
del TRANSPORTS[url]

View File

@@ -24,15 +24,15 @@ import eventlet
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
import oslo_messaging
from stevedore import driver
from tooz import coordination
from cloudkitty import collector
from cloudkitty.common import rpc
from cloudkitty import config # noqa
from cloudkitty import extension_manager
from cloudkitty.i18n import _LI, _LW
from cloudkitty import messaging
from cloudkitty import storage
from cloudkitty import transformer
from cloudkitty import utils as ck_utils
@@ -57,8 +57,8 @@ PROCESSORS_NAMESPACE = 'cloudkitty.rating.processors'
class RatingEndpoint(object):
target = messaging.Target(namespace='rating',
version='1.1')
target = oslo_messaging.Target(namespace='rating',
version='1.1')
def __init__(self, orchestrator):
self._global_reload = False
@@ -249,13 +249,13 @@ class Orchestrator(object):
random.shuffle(self._tenants)
def _init_messaging(self):
target = messaging.Target(topic='cloudkitty',
server=CONF.host,
version='1.0')
target = oslo_messaging.Target(topic='cloudkitty',
server=CONF.host,
version='1.0')
endpoints = [
self._rating_endpoint,
]
self.server = rpc.get_server(target, endpoints)
self.server = messaging.get_server(target, endpoints)
self.server.start()
def _check_state(self, tenant_id):

View File

@@ -23,7 +23,7 @@ import six
from cloudkitty.common import policy
from cloudkitty.db import api as db_api
from cloudkitty import rpc
from cloudkitty import messaging
@six.add_metaclass(abc.ABCMeta)
@@ -91,8 +91,8 @@ class RatingProcessorBase(object):
"""
api = db_api.get_instance()
module_db = api.get_module_info()
client = rpc.get_client().prepare(namespace='rating',
fanout=True)
client = messaging.get_client().prepare(namespace='rating',
fanout=True)
if enabled:
operation = 'enable_module'
else:
@@ -133,7 +133,8 @@ class RatingProcessorBase(object):
"""
def notify_reload(self):
client = rpc.get_client().prepare(namespace='rating', fanout=True)
client = messaging.get_client().prepare(namespace='rating',
fanout=True)
client.cast({}, 'reload_module', name=self.module_name)

View File

@@ -19,9 +19,12 @@ import socket
import sys
from oslo_config import cfg
from oslo_log import log as logging
import oslo_i18n
from oslo_log import log
from cloudkitty.common import defaults
from cloudkitty import messaging
from cloudkitty import version
service_opts = [
@@ -29,17 +32,24 @@ service_opts = [
default=socket.getfqdn(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.')
'However, the node name must be valid within an AMQP key, '
'and if using ZeroMQ, a valid hostname, FQDN, or IP address.')
]
cfg.CONF.register_opts(service_opts)
def prepare_service():
logging.register_options(cfg.CONF)
cfg.CONF(sys.argv[1:], project='cloudkitty')
defaults.set_config_defaults()
def prepare_service(argv=None, config_files=None):
oslo_i18n.enable_lazy()
log.register_options(cfg.CONF)
log.set_defaults()
defaults.set_cors_middleware_defaults()
logging.setup(cfg.CONF, 'cloudkitty')
if argv is None:
argv = sys.argv
cfg.CONF(argv[1:], project='cloudkitty', validate_default_values=True,
version=version.version_info.version_string(),
default_config_files=config_files)
log.setup(cfg.CONF, 'cloudkitty')
messaging.setup()

View File

@@ -24,7 +24,7 @@ import mock
from oslo_config import cfg
from oslo_config import fixture as conf_fixture
from oslo_db.sqlalchemy import utils
import oslo_messaging as messaging
import oslo_messaging
from oslo_messaging import conffixture
from oslo_policy import opts as policy_opts
import six
@@ -34,9 +34,9 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from cloudkitty.api import app
from cloudkitty.common import rpc
from cloudkitty import db
from cloudkitty.db import api as ck_db_api
from cloudkitty import messaging
from cloudkitty import rating
from cloudkitty import storage
from cloudkitty.storage.sqlalchemy import models
@@ -215,14 +215,14 @@ class BaseFakeRPC(fixture.GabbiFixture):
endpoint = None
def start_fixture(self):
rpc.init()
target = messaging.Target(topic='cloudkitty',
server=cfg.CONF.host,
version='1.0')
messaging.setup()
target = oslo_messaging.Target(topic='cloudkitty',
server=cfg.CONF.host,
version='1.0')
endpoints = [
self.endpoint()
]
self.server = rpc.get_server(target, endpoints)
self.server = messaging.get_server(target, endpoints)
self.server.start()
def stop_fixture(self):
@@ -231,8 +231,8 @@ class BaseFakeRPC(fixture.GabbiFixture):
class QuoteFakeRPC(BaseFakeRPC):
class FakeRPCEndpoint(object):
target = messaging.Target(namespace='rating',
version='1.0')
target = oslo_messaging.Target(namespace='rating',
version='1.0')
def quote(self, ctxt, res_data):
return str(1.0)
@@ -357,7 +357,7 @@ class CORSConfigFixture(fixture.GabbiFixture):
def setup_app():
rpc.init()
messaging.setup()
# FIXME(sheeprine): Extension fixtures are interacting with transformers
# loading, since collectors are not needed here we shunt them
no_collector = mock.patch(

View File

@@ -56,7 +56,7 @@ class RatingTest(tests.TestCase):
self.assertEqual(expected_infos, mod_infos)
def test_set_state_triggers_rpc(self):
with mock.patch('cloudkitty.rpc.get_client') as rpcmock:
with mock.patch('cloudkitty.messaging.get_client') as rpcmock:
rpcmock.return_value = self._fake_rpc
self._module.set_state(True)
self.assertTrue(self._fake_rpc._fanout)
@@ -74,7 +74,7 @@ class RatingTest(tests.TestCase):
self.assertEqual(expected_data, rpc_data)
def test_enable_module(self):
with mock.patch('cloudkitty.rpc.get_client') as rpcmock:
with mock.patch('cloudkitty.messaging.get_client') as rpcmock:
rpcmock.return_value = self._fake_rpc
self._module.set_state(True)
db_api = ck_db_api.get_instance()
@@ -82,7 +82,7 @@ class RatingTest(tests.TestCase):
self.assertTrue(module_db.get_state('fake'))
def test_disable_module(self):
with mock.patch('cloudkitty.rpc.get_client') as rpcmock:
with mock.patch('cloudkitty.messaging.get_client') as rpcmock:
rpcmock.return_value = self._fake_rpc
self._module.set_state(False)
db_api = ck_db_api.get_instance()

View File

@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -12,27 +10,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Guillaume Espanel
#
import oslo_messaging as messaging
from cloudkitty.common import rpc
_RPC_CLIENT = None
_RPC_TARGET = None
import pbr.version
def get_target():
global _RPC_TARGET
if _RPC_TARGET is None:
_RPC_TARGET = messaging.Target(topic='cloudkitty', version='1.0')
return _RPC_TARGET
def get_client():
global _RPC_CLIENT
if _RPC_CLIENT is None:
_RPC_CLIENT = rpc.get_client(get_target())
return _RPC_CLIENT
version_info = pbr.version.VersionInfo('cloudkitty')