diff --git a/cloudkitty/api/app.py b/cloudkitty/api/app.py index 5ee8784f..4a4efee0 100644 --- a/cloudkitty/api/app.py +++ b/cloudkitty/api/app.py @@ -27,7 +27,6 @@ import pecan from cloudkitty.api import config as api_config from cloudkitty.api import hooks from cloudkitty.i18n import _LI -from cloudkitty import rpc from cloudkitty import storage @@ -71,13 +70,9 @@ def get_pecan_config(): def setup_app(pecan_config=None, extra_hooks=None): app_conf = get_pecan_config() - - client = rpc.get_client() - storage_backend = storage.get_storage() - app_hooks = [ - hooks.RPCHook(client), + hooks.RPCHook(), hooks.StorageHook(storage_backend), ] diff --git a/cloudkitty/api/hooks.py b/cloudkitty/api/hooks.py index dfedf54b..45736077 100644 --- a/cloudkitty/api/hooks.py +++ b/cloudkitty/api/hooks.py @@ -19,11 +19,12 @@ from oslo_context import context from pecan import hooks from cloudkitty.common import policy +from cloudkitty import messaging class RPCHook(hooks.PecanHook): - def __init__(self, rcp_client): - self._rpc_client = rcp_client + def __init__(self): + self._rpc_client = messaging.get_client() def before(self, state): state.request.rpc_client = self._rpc_client diff --git a/cloudkitty/cli/api.py b/cloudkitty/cli/api.py index 094f52a8..80a46cab 100644 --- a/cloudkitty/cli/api.py +++ b/cloudkitty/cli/api.py @@ -16,13 +16,11 @@ # @author: Stéphane Albert # from cloudkitty.api import app -from cloudkitty.common import rpc from cloudkitty import service def main(): service.prepare_service() - rpc.init() server = app.build_server() try: server.serve_forever() diff --git a/cloudkitty/cli/processor.py b/cloudkitty/cli/processor.py index 5790332b..5aaa13ab 100644 --- a/cloudkitty/cli/processor.py +++ b/cloudkitty/cli/processor.py @@ -15,14 +15,12 @@ # # @author: Stéphane Albert # -from cloudkitty.common import rpc from cloudkitty import orchestrator from cloudkitty import service def main(): service.prepare_service() - rpc.init() processor = orchestrator.Orchestrator() try: processor.process() diff --git a/cloudkitty/common/rpc.py b/cloudkitty/common/rpc.py deleted file mode 100644 index 37cd479b..00000000 --- a/cloudkitty/common/rpc.py +++ /dev/null @@ -1,43 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 Objectif Libre -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Stéphane Albert -# -from oslo_config import cfg -import oslo_messaging as messaging - -TRANSPORT = None - - -def init(): - global TRANSPORT - if not TRANSPORT: - TRANSPORT = messaging.get_transport(cfg.CONF) - return TRANSPORT - - -def get_client(target, version_cap=None): - assert TRANSPORT is not None - return messaging.RPCClient(TRANSPORT, - target, - version_cap=version_cap) - - -def get_server(target, endpoints): - assert TRANSPORT is not None - return messaging.get_rpc_server(TRANSPORT, - target, - endpoints, - executor='eventlet') diff --git a/cloudkitty/messaging.py b/cloudkitty/messaging.py new file mode 100644 index 00000000..3ffa28ef --- /dev/null +++ b/cloudkitty/messaging.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 99Cloud zhangguoqing +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +import oslo_messaging + +DEFAULT_URL = "__default__" +RPC_TARGET = None +TRANSPORTS = {} + + +def setup(): + oslo_messaging.set_transport_defaults('cloudkitty') + + +def get_transport(url=None, optional=False, cache=True): + """Initialise the oslo_messaging layer.""" + global TRANSPORTS, DEFAULT_URL + cache_key = url or DEFAULT_URL + transport = TRANSPORTS.get(cache_key) + if not transport or not cache: + try: + transport = oslo_messaging.get_transport(cfg.CONF, url) + except (oslo_messaging.InvalidTransportURL, + oslo_messaging.DriverLoadFailure): + if not optional or url: + # NOTE(sileht): oslo_messaging is configured but unloadable + # so reraise the exception + raise + return None + else: + if cache: + TRANSPORTS[cache_key] = transport + return transport + + +def get_target(): + global RPC_TARGET + if RPC_TARGET is None: + RPC_TARGET = oslo_messaging.Target(topic='cloudkitty', version='1.0') + return RPC_TARGET + + +def get_client(version_cap=None): + transport = get_transport() + target = get_target() + return oslo_messaging.RPCClient(transport, target, version_cap=version_cap) + + +def get_server(target=None, endpoints=None): + transport = get_transport() + if not target: + target = get_target() + return oslo_messaging.get_rpc_server(transport, target, + endpoints, executor='eventlet') + + +def cleanup(): + """Cleanup the oslo_messaging layer.""" + global TRANSPORTS, NOTIFIERS + NOTIFIERS = {} + for url in TRANSPORTS: + TRANSPORTS[url].cleanup() + del TRANSPORTS[url] diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index e0241168..1e4b9c2e 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -24,15 +24,15 @@ import eventlet from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging -import oslo_messaging as messaging +import oslo_messaging from stevedore import driver from tooz import coordination from cloudkitty import collector -from cloudkitty.common import rpc from cloudkitty import config # noqa from cloudkitty import extension_manager from cloudkitty.i18n import _LI, _LW +from cloudkitty import messaging from cloudkitty import storage from cloudkitty import transformer from cloudkitty import utils as ck_utils @@ -57,8 +57,8 @@ PROCESSORS_NAMESPACE = 'cloudkitty.rating.processors' class RatingEndpoint(object): - target = messaging.Target(namespace='rating', - version='1.1') + target = oslo_messaging.Target(namespace='rating', + version='1.1') def __init__(self, orchestrator): self._global_reload = False @@ -249,13 +249,13 @@ class Orchestrator(object): random.shuffle(self._tenants) def _init_messaging(self): - target = messaging.Target(topic='cloudkitty', - server=CONF.host, - version='1.0') + target = oslo_messaging.Target(topic='cloudkitty', + server=CONF.host, + version='1.0') endpoints = [ self._rating_endpoint, ] - self.server = rpc.get_server(target, endpoints) + self.server = messaging.get_server(target, endpoints) self.server.start() def _check_state(self, tenant_id): diff --git a/cloudkitty/rating/__init__.py b/cloudkitty/rating/__init__.py index 0163e5d3..74d40aeb 100644 --- a/cloudkitty/rating/__init__.py +++ b/cloudkitty/rating/__init__.py @@ -23,7 +23,7 @@ import six from cloudkitty.common import policy from cloudkitty.db import api as db_api -from cloudkitty import rpc +from cloudkitty import messaging @six.add_metaclass(abc.ABCMeta) @@ -91,8 +91,8 @@ class RatingProcessorBase(object): """ api = db_api.get_instance() module_db = api.get_module_info() - client = rpc.get_client().prepare(namespace='rating', - fanout=True) + client = messaging.get_client().prepare(namespace='rating', + fanout=True) if enabled: operation = 'enable_module' else: @@ -133,7 +133,8 @@ class RatingProcessorBase(object): """ def notify_reload(self): - client = rpc.get_client().prepare(namespace='rating', fanout=True) + client = messaging.get_client().prepare(namespace='rating', + fanout=True) client.cast({}, 'reload_module', name=self.module_name) diff --git a/cloudkitty/service.py b/cloudkitty/service.py index d3d76355..d0c863cb 100644 --- a/cloudkitty/service.py +++ b/cloudkitty/service.py @@ -19,9 +19,12 @@ import socket import sys from oslo_config import cfg -from oslo_log import log as logging +import oslo_i18n +from oslo_log import log from cloudkitty.common import defaults +from cloudkitty import messaging +from cloudkitty import version service_opts = [ @@ -29,17 +32,24 @@ service_opts = [ default=socket.getfqdn(), help='Name of this node. This can be an opaque identifier. ' 'It is not necessarily a hostname, FQDN, or IP address. ' - 'However, the node name must be valid within ' - 'an AMQP key, and if using ZeroMQ, a valid ' - 'hostname, FQDN, or IP address.') + 'However, the node name must be valid within an AMQP key, ' + 'and if using ZeroMQ, a valid hostname, FQDN, or IP address.') ] cfg.CONF.register_opts(service_opts) -def prepare_service(): - logging.register_options(cfg.CONF) - cfg.CONF(sys.argv[1:], project='cloudkitty') - defaults.set_config_defaults() +def prepare_service(argv=None, config_files=None): + oslo_i18n.enable_lazy() + log.register_options(cfg.CONF) + log.set_defaults() + defaults.set_cors_middleware_defaults() - logging.setup(cfg.CONF, 'cloudkitty') + if argv is None: + argv = sys.argv + cfg.CONF(argv[1:], project='cloudkitty', validate_default_values=True, + version=version.version_info.version_string(), + default_config_files=config_files) + + log.setup(cfg.CONF, 'cloudkitty') + messaging.setup() diff --git a/cloudkitty/tests/gabbi/fixtures.py b/cloudkitty/tests/gabbi/fixtures.py index fc2cbaec..7701dd26 100644 --- a/cloudkitty/tests/gabbi/fixtures.py +++ b/cloudkitty/tests/gabbi/fixtures.py @@ -24,7 +24,7 @@ import mock from oslo_config import cfg from oslo_config import fixture as conf_fixture from oslo_db.sqlalchemy import utils -import oslo_messaging as messaging +import oslo_messaging from oslo_messaging import conffixture from oslo_policy import opts as policy_opts import six @@ -34,9 +34,9 @@ from wsme import types as wtypes import wsmeext.pecan as wsme_pecan from cloudkitty.api import app -from cloudkitty.common import rpc from cloudkitty import db from cloudkitty.db import api as ck_db_api +from cloudkitty import messaging from cloudkitty import rating from cloudkitty import storage from cloudkitty.storage.sqlalchemy import models @@ -215,14 +215,14 @@ class BaseFakeRPC(fixture.GabbiFixture): endpoint = None def start_fixture(self): - rpc.init() - target = messaging.Target(topic='cloudkitty', - server=cfg.CONF.host, - version='1.0') + messaging.setup() + target = oslo_messaging.Target(topic='cloudkitty', + server=cfg.CONF.host, + version='1.0') endpoints = [ self.endpoint() ] - self.server = rpc.get_server(target, endpoints) + self.server = messaging.get_server(target, endpoints) self.server.start() def stop_fixture(self): @@ -231,8 +231,8 @@ class BaseFakeRPC(fixture.GabbiFixture): class QuoteFakeRPC(BaseFakeRPC): class FakeRPCEndpoint(object): - target = messaging.Target(namespace='rating', - version='1.0') + target = oslo_messaging.Target(namespace='rating', + version='1.0') def quote(self, ctxt, res_data): return str(1.0) @@ -357,7 +357,7 @@ class CORSConfigFixture(fixture.GabbiFixture): def setup_app(): - rpc.init() + messaging.setup() # FIXME(sheeprine): Extension fixtures are interacting with transformers # loading, since collectors are not needed here we shunt them no_collector = mock.patch( diff --git a/cloudkitty/tests/test_rating.py b/cloudkitty/tests/test_rating.py index 0659b9e4..be36070d 100644 --- a/cloudkitty/tests/test_rating.py +++ b/cloudkitty/tests/test_rating.py @@ -56,7 +56,7 @@ class RatingTest(tests.TestCase): self.assertEqual(expected_infos, mod_infos) def test_set_state_triggers_rpc(self): - with mock.patch('cloudkitty.rpc.get_client') as rpcmock: + with mock.patch('cloudkitty.messaging.get_client') as rpcmock: rpcmock.return_value = self._fake_rpc self._module.set_state(True) self.assertTrue(self._fake_rpc._fanout) @@ -74,7 +74,7 @@ class RatingTest(tests.TestCase): self.assertEqual(expected_data, rpc_data) def test_enable_module(self): - with mock.patch('cloudkitty.rpc.get_client') as rpcmock: + with mock.patch('cloudkitty.messaging.get_client') as rpcmock: rpcmock.return_value = self._fake_rpc self._module.set_state(True) db_api = ck_db_api.get_instance() @@ -82,7 +82,7 @@ class RatingTest(tests.TestCase): self.assertTrue(module_db.get_state('fake')) def test_disable_module(self): - with mock.patch('cloudkitty.rpc.get_client') as rpcmock: + with mock.patch('cloudkitty.messaging.get_client') as rpcmock: rpcmock.return_value = self._fake_rpc self._module.set_state(False) db_api = ck_db_api.get_instance() diff --git a/cloudkitty/rpc.py b/cloudkitty/version.py similarity index 53% rename from cloudkitty/rpc.py rename to cloudkitty/version.py index e72ee3e6..628a4d23 100644 --- a/cloudkitty/rpc.py +++ b/cloudkitty/version.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 Objectif Libre # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,27 +10,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# -# @author: Guillaume Espanel -# -import oslo_messaging as messaging - -from cloudkitty.common import rpc -_RPC_CLIENT = None -_RPC_TARGET = None +import pbr.version - -def get_target(): - global _RPC_TARGET - if _RPC_TARGET is None: - _RPC_TARGET = messaging.Target(topic='cloudkitty', version='1.0') - return _RPC_TARGET - - -def get_client(): - global _RPC_CLIENT - if _RPC_CLIENT is None: - _RPC_CLIENT = rpc.get_client(get_target()) - return _RPC_CLIENT +version_info = pbr.version.VersionInfo('cloudkitty')