diff --git a/kingbird/api/api_config.py b/kingbird/api/api_config.py index 631a664..c30fbec 100644 --- a/kingbird/api/api_config.py +++ b/kingbird/api/api_config.py @@ -29,7 +29,6 @@ from kingbird.common.i18n import _LI # from kingbird import policy -from kingbird.common import rpc from kingbird.common import version LOG = logging.getLogger(__name__) @@ -73,8 +72,6 @@ def init(args, **kwargs): version='%%(prog)s %s' % version.version_info.release_string(), **kwargs) - rpc.init(cfg.CONF) - def setup_logging(): """Sets up the logging options for a log with supplied name.""" diff --git a/kingbird/api/controllers/quota_manager.py b/kingbird/api/controllers/quota_manager.py index 37d3214..74ce697 100644 --- a/kingbird/api/controllers/quota_manager.py +++ b/kingbird/api/controllers/quota_manager.py @@ -17,7 +17,6 @@ import collections from oslo_config import cfg from oslo_log import log as logging from oslo_log import versionutils -import oslo_messaging as messaging import pecan from pecan import expose from pecan import request @@ -27,11 +26,9 @@ import six from kingbird.common import exceptions from kingbird.common.i18n import _ -from kingbird.common import rpc -from kingbird.common.serializer import KingbirdSerializer as Serializer -from kingbird.common import topics from kingbird.common import utils from kingbird.db.sqlalchemy import api as db_api +from kingbird.rpc import client as rpc_client CONF = cfg.CONF @@ -57,18 +54,7 @@ class QuotaManagerController(object): def __init__(self, *args, **kwargs): super(QuotaManagerController, self).__init__(*args, **kwargs) - target = messaging.Target(topic=topics.TOPIC_KB_ENGINE, version='1.0') - upgrade_level = CONF.upgrade_levels.kb_engine - version_cap = 1.0 - if upgrade_level == 'auto': - version_cap = self._determine_version_cap(target) - else: - version_cap = self.VERSION_ALIASES.get(upgrade_level, - upgrade_level) - serializer = Serializer() - self.client = rpc.get_client(target, - version_cap=version_cap, - serializer=serializer) + self.rpc_client = rpc_client.EngineClient() # to do the version compatibility for future purpose def _determine_version_cap(self, target): @@ -95,10 +81,8 @@ class QuotaManagerController(object): pecan.abort(404, _('Invalid request URL')) elif action == 'detail': # Get the current quota usages for a project - result = self.client.call( - context, - 'get_total_usage_for_tenant', - project_id=project_id) + result = self.rpc_client.get_total_usage_for_tenant( + context, project_id) else: # Get quota limits for all the resources for a project result = db_api.quota_get_all_by_project( @@ -184,8 +168,8 @@ class QuotaManagerController(object): if not context.is_admin: pecan.abort(403, _('Admin required')) - self.client.cast(context, 'quota_sync_for_project', - project_id=project_id) + self.rpc_client.quota_sync_for_project( + context, project_id) return 'triggered quota sync for ' + project_id @staticmethod diff --git a/kingbird/api/controllers/restcomm.py b/kingbird/api/controllers/restcomm.py index d0ea651..7602487 100644 --- a/kingbird/api/controllers/restcomm.py +++ b/kingbird/api/controllers/restcomm.py @@ -37,4 +37,4 @@ def extract_context_from_environ(): role = environ.get('HTTP_X_ROLE') context_paras['is_admin'] = role == 'admin' - return k_context.Context(**context_paras) + return k_context.RequestContext(**context_paras) diff --git a/kingbird/cmd/api.py b/kingbird/cmd/api.py old mode 100755 new mode 100644 index 500be7f..c5459eb --- a/kingbird/cmd/api.py +++ b/kingbird/cmd/api.py @@ -19,8 +19,10 @@ import sys +import eventlet from oslo_config import cfg from oslo_log import log as logging +from oslo_service import systemd from oslo_service import wsgi import logging as std_logging @@ -31,10 +33,12 @@ from kingbird.api import app from kingbird.common import config from kingbird.common.i18n import _LI from kingbird.common.i18n import _LW +from kingbird.common import messaging CONF = cfg.CONF config.register_options() LOG = logging.getLogger('kingbird.api') +eventlet.monkey_patch(os=False) def main(): @@ -52,7 +56,8 @@ def main(): LOG.info(_LI("Server on http://%(host)s:%(port)s with %(workers)s"), {'host': host, 'port': port, 'workers': workers}) - + messaging.setup() + systemd.notify_once() service = wsgi.Server(CONF, "Kingbird", application, host, port) app.serve(service, CONF, workers) diff --git a/kingbird/cmd/engine.py b/kingbird/cmd/engine.py old mode 100755 new mode 100644 index 816f57b..babe688 --- a/kingbird/cmd/engine.py +++ b/kingbird/cmd/engine.py @@ -1,64 +1,54 @@ -# Copyright 2015 Huawei Technologies Co., Ltd. +#!/usr/bin/env python # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Kingbird Engine Server. +""" import eventlet - -if __name__ == "__main__": - eventlet.monkey_patch() - -import sys +eventlet.monkey_patch() from oslo_config import cfg +from oslo_i18n import _lazy from oslo_log import log as logging - -import logging as std_logging +from oslo_service import service from kingbird.common import config +from kingbird.common import consts +from kingbird.common import messaging -from kingbird.common.i18n import _LI -from kingbird.common.i18n import _LW -from kingbird.engine import engine_config -from kingbird.engine import service - -CONF = cfg.CONF +_lazy.enable_lazy() config.register_options() LOG = logging.getLogger('kingbird.engine') def main(): - engine_config.init(sys.argv[1:]) - engine_config.setup_logging() + logging.register_options(cfg.CONF) + cfg.CONF(project='kingbird', prog='kingbird-engine') + logging.setup(cfg.CONF, 'kingbird-engine') + logging.set_defaults() + messaging.setup() - host = CONF.host - workers = CONF.workers - - if workers < 1: - LOG.warning(_LW("Wrong worker number, worker = %(workers)s"), workers) - workers = 1 - - LOG.info(_LI("Server on http://%(host)s with %(workers)s"), - {'host': host, 'workers': workers}) - - engine_service = service.create_service() - service.serve(engine_service, workers) - service.wait() - - LOG.info(_LI("Configuration:")) - CONF.log_opt_values(LOG, std_logging.INFO) + from kingbird.engine import service as engine + srv = engine.EngineService(cfg.CONF.host, + consts.TOPIC_KB_ENGINE) + launcher = service.launch(cfg.CONF, + srv, workers=cfg.CONF.workers) + # the following periodic tasks are intended serve as HA checking + # srv.create_periodic_tasks() + launcher.wait() if __name__ == '__main__': main() diff --git a/kingbird/common/config.py b/kingbird/common/config.py index 063b8dd..662d741 100644 --- a/kingbird/common/config.py +++ b/kingbird/common/config.py @@ -128,6 +128,27 @@ cache_opts = [ ' auto_refresh_endpoint set to True') ] +scheduler_opts = [ + cfg.BoolOpt('periodic_enable', + default=True, + help='boolean value for enable/disenable periodic tasks'), + cfg.IntOpt('periodic_interval', + default=900, + help='periodic time interval for automatic quota sync job') +] + + +common_opts = [ + cfg.IntOpt('workers', default=1, + help='number of workers'), + cfg.StrOpt('host', + default='localhost', + help='hostname of the machine') +] + + +scheduler_opt_group = cfg.OptGroup('scheduler', + title='Scheduler options for periodic job') # The group stores Kingbird global limit for all the projects default_quota_group = cfg.OptGroup(name='kingbird_global_limit', title='Global quota limit for all projects') @@ -141,7 +162,9 @@ def list_opts(): yield default_quota_group.name, neutron_quotas yield default_quota_group.name, cinder_quotas yield cache_opt_group.name, cache_opts + yield scheduler_opt_group.name, scheduler_opts yield None, global_opts + yield None, common_opts def register_options(): diff --git a/kingbird/common/consts.py b/kingbird/common/consts.py index 39378ea..275facb 100644 --- a/kingbird/common/consts.py +++ b/kingbird/common/consts.py @@ -35,3 +35,7 @@ NEUTRON_QUOTA_FIELDS = ("network", "security_group", "security_group_rule", ) + +RPC_API_VERSION = "1.0" + +TOPIC_KB_ENGINE = "kingbird-engine" diff --git a/kingbird/common/context.py b/kingbird/common/context.py index c4a77c4..f172abc 100644 --- a/kingbird/common/context.py +++ b/kingbird/common/context.py @@ -1,84 +1,124 @@ -# Copyright 2015 Huawei Technologies Co., Ltd. -# All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. -from oslo_context import context as oslo_ctx +from oslo_context import context as base_context +from oslo_utils import encodeutils + +from kingbird.common import policy +from kingbird.db import api as db_api -class ContextBase(oslo_ctx.RequestContext): - def __init__(self, auth_token=None, user_id=None, tenant_id=None, - is_admin=False, request_id=None, overwrite=True, - user_name=None, tenant_name=None, auth_url=None, - region=None, password=None, domain='default', - project_name=None, **kwargs): - super(ContextBase, self).__init__( - auth_token=auth_token, - user=user_id or kwargs.get('user', None), - tenant=tenant_id or kwargs.get('tenant', None), - domain=kwargs.get('domain', None), - user_domain=kwargs.get('user_domain', None), - project_domain=kwargs.get('project_domain', None), - is_admin=is_admin, - read_only=kwargs.get('read_only', False), - show_deleted=kwargs.get('show_deleted', False), - request_id=request_id, - resource_uuid=kwargs.get('resource_uuid', None), - overwrite=overwrite) - self.user_name = user_name - self.tenant_name = tenant_name - self.tenant_id = tenant_id - self.auth_url = auth_url - self.password = password - self.default_name = domain - self.region_name = region - self.project_name = project_name +class RequestContext(base_context.RequestContext): + '''Stores information about the security context. - def to_dict(self): - ctx_dict = super(ContextBase, self).to_dict() - ctx_dict.update({ - 'user_name': self.user_name, - 'tenant_name': self.tenant_name, - 'project_name': self.project_name, - 'tenant_id': self.tenant_id, - 'auth_url': self.auth_url, - 'password': self.password, - 'default_name': self.default_name, - 'region_name': self.region_name, - }) - return ctx_dict + The context encapsulates information related to the user accessing the + the system, as well as additional request information. + ''' - @classmethod - def from_dict(cls, ctx): - return cls(**ctx) + def __init__(self, auth_token=None, user=None, project=None, + domain=None, user_domain=None, project_domain=None, + is_admin=None, read_only=False, show_deleted=False, + request_id=None, auth_url=None, trusts=None, + user_name=None, project_name=None, domain_name=None, + user_domain_name=None, project_domain_name=None, + auth_token_info=None, region_name=None, roles=None, + password=None, **kwargs): + '''Initializer of request context.''' + # We still have 'tenant' param because oslo_context still use it. + super(RequestContext, self).__init__( + auth_token=auth_token, user=user, tenant=project, + domain=domain, user_domain=user_domain, + project_domain=project_domain, + read_only=read_only, show_deleted=show_deleted, + request_id=request_id) -class Context(ContextBase): - def __init__(self, **kwargs): - super(Context, self).__init__(**kwargs) + # request_id might be a byte array + self.request_id = encodeutils.safe_decode(self.request_id) + + # we save an additional 'project' internally for use + self.project = project + + # Session for DB access self._session = None + self.auth_url = auth_url + self.trusts = trusts + + self.user_name = user_name + self.project_name = project_name + self.domain_name = domain_name + self.user_domain_name = user_domain_name + self.project_domain_name = project_domain_name + + self.auth_token_info = auth_token_info + self.region_name = region_name + self.roles = roles or [] + self.password = password + + # Check user is admin or not + if is_admin is None: + self.is_admin = policy.enforce(self, 'context_is_admin', + target={'project': self.project}, + do_raise=False) + else: + self.is_admin = is_admin + @property def session(self): - # todo get db session in the context - # if not self._session: - # self._session = dal.get_session() + if self._session is None: + self._session = db_api.get_session() return self._session + def to_dict(self): + return { + 'auth_url': self.auth_url, + 'auth_token': self.auth_token, + 'auth_token_info': self.auth_token_info, + 'user': self.user, + 'user_name': self.user_name, + 'user_domain': self.user_domain, + 'user_domain_name': self.user_domain_name, + 'project': self.project, + 'project_name': self.project_name, + 'project_domain': self.project_domain, + 'project_domain_name': self.project_domain_name, + 'domain': self.domain, + 'domain_name': self.domain_name, + 'trusts': self.trusts, + 'region_name': self.region_name, + 'roles': self.roles, + 'show_deleted': self.show_deleted, + 'is_admin': self.is_admin, + 'request_id': self.request_id, + 'password': self.password, + } -def get_admin_context(read_only=True): - return ContextBase(user_id=None, - project_id=None, - is_admin=True, - overwrite=False, - read_only=read_only) + @classmethod + def from_dict(cls, values): + return cls(**values) + + +def get_admin_context(show_deleted=False): + return RequestContext(is_admin=True, show_deleted=show_deleted) + + +def get_service_context(**args): + '''An abstraction layer for getting service context. + + There could be multiple cloud backends for kingbird to use. This + abstraction layer provides an indirection for kingbird to get the + credentials of 'kingbird' user on the specific cloud. By default, + this credential refers to the credentials built for keystone middleware + in an OpenStack cloud. + ''' + pass diff --git a/kingbird/common/manager.py b/kingbird/common/manager.py index cb90c2b..e5d6a02 100644 --- a/kingbird/common/manager.py +++ b/kingbird/common/manager.py @@ -42,20 +42,12 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_service import periodic_task +from kingbird.common import config CONF = cfg.CONF +config.register_options() LOG = logging.getLogger(__name__) -host_opts = [ - cfg.StrOpt('host', - default='localhost', - help='hostname of the machine') -] - -host_opt_group = cfg.OptGroup('host_details') -cfg.CONF.register_group(host_opt_group) -cfg.CONF.register_opts(host_opts, group=host_opt_group) - class PeriodicTasks(periodic_task.PeriodicTasks): def __init__(self): @@ -66,7 +58,7 @@ class Manager(PeriodicTasks): def __init__(self, host=None, service_name='undefined'): if not host: - host = cfg.CONF.host_details.host + host = cfg.CONF.host self.host = host self.service_name = service_name # self.notifier = rpc.get_notifier(self.service_name, self.host) @@ -123,7 +115,3 @@ class Manager(PeriodicTasks): """ pass - - -def list_opts(): - yield host_opt_group.name, host_opts diff --git a/kingbird/common/messaging.py b/kingbird/common/messaging.py new file mode 100644 index 0000000..a6f6c03 --- /dev/null +++ b/kingbird/common/messaging.py @@ -0,0 +1,111 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet + +from oslo_config import cfg +import oslo_messaging +from oslo_serialization import jsonutils + +from kingbird.common import context + +TRANSPORT = None +NOTIFIER = None + +_ALIASES = { + 'kingbird.openstack.common.rpc.impl_kombu': 'rabbit', + 'kingbird.openstack.common.rpc.impl_qpid': 'qpid', + 'kingbird.openstack.common.rpc.impl_zmq': 'zmq', +} + + +class RequestContextSerializer(oslo_messaging.Serializer): + def __init__(self, base): + self._base = base + + def serialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.serialize_entity(ctxt, entity) + + def deserialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.deserialize_entity(ctxt, entity) + + @staticmethod + def serialize_context(ctxt): + return ctxt.to_dict() + + @staticmethod + def deserialize_context(ctxt): + return context.RequestContext.from_dict(ctxt) + + +class JsonPayloadSerializer(oslo_messaging.NoOpSerializer): + @classmethod + def serialize_entity(cls, context, entity): + return jsonutils.to_primitive(entity, convert_instances=True) + + +def setup(url=None, optional=False): + """Initialise the oslo_messaging layer.""" + global TRANSPORT, NOTIFIER + + if url and url.startswith("fake://"): + # NOTE: oslo_messaging fake driver uses time.sleep + # for task switch, so we need to monkey_patch it + eventlet.monkey_patch(time=True) + + if not TRANSPORT: + oslo_messaging.set_transport_defaults('kingbird') + exmods = ['kingbird.common.exception'] + try: + TRANSPORT = oslo_messaging.get_transport( + cfg.CONF, url, allowed_remote_exmods=exmods, aliases=_ALIASES) + except oslo_messaging.InvalidTransportURL as e: + TRANSPORT = None + if not optional or e.url: + raise + + if not NOTIFIER and TRANSPORT: + serializer = RequestContextSerializer(JsonPayloadSerializer()) + NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer) + + +def cleanup(): + """Cleanup the oslo_messaging layer.""" + global TRANSPORT, NOTIFIER + if TRANSPORT: + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None + + +def get_rpc_server(target, endpoint): + """Return a configured oslo_messaging rpc server.""" + serializer = RequestContextSerializer(JsonPayloadSerializer()) + return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint], + executor='eventlet', + serializer=serializer) + + +def get_rpc_client(**kwargs): + """Return a configured oslo_messaging RPCClient.""" + target = oslo_messaging.Target(**kwargs) + serializer = RequestContextSerializer(JsonPayloadSerializer()) + return oslo_messaging.RPCClient(TRANSPORT, target, + serializer=serializer) + + +def get_notifier(publisher_id): + """Return a configured oslo_messaging notifier.""" + return NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/kingbird/common/policy.py b/kingbird/common/policy.py new file mode 100644 index 0000000..efc22a8 --- /dev/null +++ b/kingbird/common/policy.py @@ -0,0 +1,49 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Policy Engine For Kingbird +""" + +# from oslo_concurrency import lockutils +from oslo_config import cfg +from oslo_policy import policy + +from kingbird.common import exceptions + +POLICY_ENFORCER = None +CONF = cfg.CONF + + +# @lockutils.synchronized('policy_enforcer', 'kingbird-') +def _get_enforcer(policy_file=None, rules=None, default_rule=None): + + global POLICY_ENFORCER + + if POLICY_ENFORCER is None: + POLICY_ENFORCER = policy.Enforcer(CONF, + policy_file=policy_file, + rules=rules, + default_rule=default_rule) + return POLICY_ENFORCER + + +def enforce(context, rule, target, do_raise=True, *args, **kwargs): + + enforcer = _get_enforcer() + credentials = context.to_dict() + target = target or {} + if do_raise: + kwargs.update(exc=exceptions.Forbidden) + + return enforcer.enforce(rule, target, credentials, do_raise, + *args, **kwargs) diff --git a/kingbird/common/rpc.py b/kingbird/common/rpc.py deleted file mode 100644 index fa82ae8..0000000 --- a/kingbird/common/rpc.py +++ /dev/null @@ -1,135 +0,0 @@ -# Copyright 2015 Huawei Technologies Co., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# copy and modify from Nova - -__all__ = [ - 'init', - 'cleanup', - 'set_defaults', - 'add_extra_exmods', - 'clear_extra_exmods', - 'get_allowed_exmods', - 'RequestContextSerializer', - 'get_client', - 'get_server', - 'get_notifier', -] - -from oslo_config import cfg -import oslo_messaging as messaging -from oslo_serialization import jsonutils - -import kingbird.common.context -import kingbird.common.exceptions - -CONF = cfg.CONF -TRANSPORT = None -NOTIFIER = None - -ALLOWED_EXMODS = [ - kingbird.common.exceptions.__name__, -] -EXTRA_EXMODS = [] - - -def init(conf): - global TRANSPORT, NOTIFIER - exmods = get_allowed_exmods() - TRANSPORT = messaging.get_transport(conf, - allowed_remote_exmods=exmods) - serializer = RequestContextSerializer(JsonPayloadSerializer()) - NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer) - - -def cleanup(): - global TRANSPORT, NOTIFIER - assert TRANSPORT is not None - assert NOTIFIER is not None - TRANSPORT.cleanup() - TRANSPORT = NOTIFIER = None - - -def set_defaults(control_exchange): - messaging.set_transport_defaults(control_exchange) - - -def add_extra_exmods(*args): - EXTRA_EXMODS.extend(args) - - -def clear_extra_exmods(): - del EXTRA_EXMODS[:] - - -def get_allowed_exmods(): - return ALLOWED_EXMODS + EXTRA_EXMODS - - -class JsonPayloadSerializer(messaging.NoOpSerializer): - @staticmethod - def serialize_entity(context, entity): - return jsonutils.to_primitive(entity, convert_instances=True) - - -class RequestContextSerializer(messaging.Serializer): - - def __init__(self, base): - self._base = base - - def serialize_entity(self, context, entity): - if not self._base: - return entity - return self._base.serialize_entity(context, entity) - - def deserialize_entity(self, context, entity): - if not self._base: - return entity - return self._base.deserialize_entity(context, entity) - - def serialize_context(self, context): - return context.to_dict() - - def deserialize_context(self, context): - return kingbird.common.context.Context.from_dict(context) - - -def get_transport_url(url_str=None): - return messaging.TransportURL.parse(CONF, url_str) - - -def get_client(target, version_cap=None, serializer=None): - assert TRANSPORT is not None - serializer = RequestContextSerializer(serializer) - return messaging.RPCClient(TRANSPORT, - target, - version_cap=version_cap, - serializer=serializer) - - -def get_server(target, endpoints, serializer=None): - assert TRANSPORT is not None - serializer = RequestContextSerializer(serializer) - return messaging.get_rpc_server(TRANSPORT, - target, - endpoints, - executor='eventlet', - serializer=serializer) - - -def get_notifier(service, host=None, publisher_id=None): - assert NOTIFIER is not None - if not publisher_id: - publisher_id = "%s.%s" % (service, host or CONF.host) - return NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/kingbird/common/service.py b/kingbird/common/service.py deleted file mode 100644 index 16f3241..0000000 --- a/kingbird/common/service.py +++ /dev/null @@ -1,198 +0,0 @@ -# Copyright (c) 2015 Huawei, Tech. Co,. Ltd. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# copy and modify from Nova - -"""Generic service base class for all workers that run on hosts.""" - -import os -import random -import sys - -from oslo_config import cfg -from oslo_log import log as logging -import oslo_messaging as messaging -from oslo_service import service - -import baserpc - -from kingbird.common.i18n import _ -from kingbird.common.i18n import _LE -from kingbird.common.i18n import _LI - -from kingbird.common import context -from kingbird.common import rpc -from kingbird.common import version - -LOG = logging.getLogger(__name__) - -service_opts = [ - cfg.IntOpt('report_interval', - default=10, - help='Seconds between nodes reporting state to datastore'), - cfg.BoolOpt('periodic_enable', - default=True, - help='Enable periodic tasks'), - cfg.IntOpt('periodic_fuzzy_delay', - default=60, - help='Range of seconds to randomly delay when starting the' - ' periodic task scheduler to reduce stampeding.' - ' (Disable by setting to 0)'), - ] - -CONF = cfg.CONF -CONF.register_opts(service_opts) - - -class Service(service.Service): - - """class Service - - Service object for binaries running on hosts. - A service takes a manager and enables rpc by listening to queues based - on topic. It also periodically runs tasks on the manager and reports - its state to the database services table. - """ - - def __init__(self, host, binary, topic, manager, report_interval=None, - periodic_enable=None, periodic_fuzzy_delay=None, - periodic_interval_max=None, serializer=None, - *args, **kwargs): - super(Service, self).__init__() - self.host = host - self.binary = binary - self.topic = topic - self.manager = manager - self.rpc_server = None - self.report_interval = report_interval - self.periodic_enable = periodic_enable - self.periodic_fuzzy_delay = periodic_fuzzy_delay - self.interval_max = periodic_interval_max - self.serializer = serializer - self.saved_args, self.saved_kwargs = args, kwargs - - def start(self): - ver_str = version.version_string_with_package() - LOG.info(_LI('Starting %(topic)s node (version %(version)s)'), - {'topic': self.topic, 'version': ver_str}) - - self.basic_config_check() - self.manager.init_host() - self.manager.pre_start_hook() - - LOG.debug(_("Creating RPC server for service %s"), self.topic) - - target = messaging.Target(topic=self.topic, server=self.host) - - endpoints = [ - self.manager, - baserpc.BaseServerRPCAPI(self.manager.service_name) - ] - endpoints.extend(self.manager.additional_endpoints) - - self.rpc_server = rpc.get_server(target, endpoints, self.serializer) - self.rpc_server.start() - - self.manager.post_start_hook() - - if self.periodic_enable: - if self.periodic_fuzzy_delay: - initial_delay = random.randint(0, self.periodic_fuzzy_delay) - else: - initial_delay = None - - self.tg.add_dynamic_timer(self.periodic_tasks, - initial_delay=initial_delay, - periodic_interval_max=self.interval_max) - - def __getattr__(self, key): - manager = self.__dict__.get('manager', None) - return getattr(manager, key) - - @classmethod - def create(cls, host=None, binary=None, topic=None, manager=None, - report_interval=None, periodic_enable=None, - periodic_fuzzy_delay=None, periodic_interval_max=None, - serializer=None,): - - """Instantiates class and passes back application object. - - :param host: defaults to CONF.host - :param binary: defaults to basename of executable - :param topic: defaults to bin_name - 'nova-' part - :param manager: defaults to CONF._manager - :param report_interval: defaults to CONF.report_interval - :param periodic_enable: defaults to CONF.periodic_enable - :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay - :param periodic_interval_max: if set, the max time to wait between runs - """ - - if not host: - host = CONF.host - if not binary: - binary = os.path.basename(sys.argv[0]) - if not topic: - topic = binary.rpartition('kingbird-')[2] - if not manager: - manager_cls = ('%s_manager' % - binary.rpartition('kingbird-')[2]) - manager = CONF.get(manager_cls, None) - if report_interval is None: - report_interval = CONF.report_interval - if periodic_enable is None: - periodic_enable = CONF.periodic_enable - if periodic_fuzzy_delay is None: - periodic_fuzzy_delay = CONF.periodic_fuzzy_delay - - service_obj = cls(host, binary, topic, manager, - report_interval=report_interval, - periodic_enable=periodic_enable, - periodic_fuzzy_delay=periodic_fuzzy_delay, - periodic_interval_max=periodic_interval_max, - serializer=serializer) - - return service_obj - - def kill(self): - self.stop() - - def stop(self): - try: - self.rpc_server.stop() - self.rpcserver.wait() - except Exception: - pass - - try: - self.manager.cleanup_host() - except Exception: - LOG.exception(_LE('Service error occurred during cleanup_host')) - pass - - super(Service, self).stop() - - def periodic_tasks(self, raise_on_error=False): - """Tasks to be run at a periodic interval.""" - ctxt = context.get_admin_context() - return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) - - def basic_config_check(self): - """Perform basic config checks before starting processing.""" - # Make sure the tempdir exists and is writable - # try: - # with utils.tempdir(): - # pass - # except Exception as e: - # LOG.error(_LE('Temporary directory is invalid: %s'), e) - # sys.exit(1) diff --git a/kingbird/common/topics.py b/kingbird/common/topics.py deleted file mode 100644 index 2050379..0000000 --- a/kingbird/common/topics.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright 2015 Huawei Technologies Co., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -CREATE = 'create' -DELETE = 'delete' -UPDATE = 'update' - -TOPIC_KB_ENGINE = 'engine' diff --git a/kingbird/engine/engine_config.py b/kingbird/engine/engine_config.py deleted file mode 100644 index bdb67f5..0000000 --- a/kingbird/engine/engine_config.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright 2016 Ericsson AB -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Routines for configuring kingbird, largely copy from Neutron -""" - -import sys - -from oslo_config import cfg -from oslo_log import log as logging - -from kingbird.common.i18n import _ -from kingbird.common.i18n import _LI -from kingbird.common import rpc -from kingbird.common import version - -LOG = logging.getLogger(__name__) - -common_opts = [ - cfg.StrOpt('host', default='kingbird.host', - help=_("The host name for RPC server")), - cfg.IntOpt('workers', default=2, - help=_("number of workers")), - cfg.StrOpt('state_path', - default='/var/lib/kingbird', - deprecated_name='pybasedir', - help="Top-level directory for maintaining kingbird's state"), -] - - -def init(args, **kwargs): - # Register the configuration options - cfg.CONF.register_opts(common_opts) - - # ks_session.Session.register_conf_options(cfg.CONF) - # auth.register_conf_options(cfg.CONF) - logging.register_options(cfg.CONF) - - cfg.CONF(args=args, project='kingbird.engine', - version='%%(prog)s %s' % version.version_info.release_string(), - **kwargs) - - rpc.init(cfg.CONF) - - -def setup_logging(): - """Sets up the logging options for a log with supplied name.""" - product_name = "kingbird.engine" - logging.setup(cfg.CONF, product_name) - LOG.info(_LI("Logging enabled!")) - LOG.info(_LI("%(prog)s version %(version)s"), - {'prog': sys.argv[0], - 'version': version.version_info.release_string()}) - LOG.debug("command line: %s", " ".join(sys.argv)) - - -def reset_service(): - # Reset worker in case SIGHUP is called. - # Note that this is called only in case a service is running in - # daemon mode. - setup_logging() - - # TODO(joehuang) enforce policy later - # policy.refresh() - - -def list_opts(): - yield None, common_opts diff --git a/kingbird/engine/listener.py b/kingbird/engine/listener.py deleted file mode 100644 index 7d75bf0..0000000 --- a/kingbird/engine/listener.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright 2016 Ericsson AB -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time -import uuid - -from oslo_config import cfg -from oslo_log import log as logging -import oslo_messaging as messaging - -from kingbird.common.i18n import _ -from kingbird.common.i18n import _LI -from kingbird.common import manager -from kingbird.engine.quota_manager import QuotaManager -from kingbird.engine import scheduler - - -CONF = cfg.CONF -LOG = logging.getLogger(__name__) - -scheduler_opts = [ - cfg.BoolOpt('periodic_enable', - default=True, - help='boolean value for enable/disenable periodic tasks'), - cfg.IntOpt('periodic_interval', - default=900, - help='periodic time interval for automatic quota sync job') -] - -scheduler_opt_group = cfg.OptGroup('scheduler') -cfg.CONF.register_group(scheduler_opt_group) -cfg.CONF.register_opts(scheduler_opts, group=scheduler_opt_group) - - -class EngineManager(manager.Manager): - """Manages all the kb engine activities.""" - - target = messaging.Target(version='1.0') - - def __init__(self, *args, **kwargs): - self.engine_id = str(uuid.uuid4()) - self.qm = QuotaManager() - self.TG = scheduler.ThreadGroupManager() - self.periodic_enable = cfg.CONF.scheduler.periodic_enable - self.periodic_interval = cfg.CONF.scheduler.periodic_interval - LOG.debug(_('Engine initialization...')) - - super(EngineManager, self).__init__(service_name="engine_manager", - *args, **kwargs) - if self.periodic_enable: - LOG.debug("Adding periodic tasks for the engine to perform") - self.TG.add_timer(self.periodic_interval, - self.periodic_balance_all, None, self.engine_id) - - def init_host(self): - LOG.debug(_('Engine init_host...')) - - pass - - def cleanup_host(self): - LOG.debug(_('Engine cleanup_host...')) - - pass - - def pre_start_hook(self): - LOG.debug(_('Engine pre_start_hook...')) - - pass - - def post_start_hook(self): - LOG.debug(_('Engine post_start_hook...')) - - pass - - def periodic_balance_all(self, engine_id): - # Automated Quota Sync for all the keystone projects - LOG.info(_LI("Periodic quota sync job started at: %s"), - time.strftime("%c")) - self.qm.periodic_balance_all(engine_id) - - def quota_sync_for_project(self, ctx, project_id): - # On Demand Quota Sync for a project, will be triggered by KB-API - LOG.info(_LI("On Demand Quota Sync Called for: %s"), - project_id) - self.qm.quota_sync_for_project(project_id) - - def get_total_usage_for_tenant(self, ctx, project_id): - # Returns a dictionary containing nova, neutron & - # cinder usages for the project - LOG.info(_LI("Get total tenant usage called for: %s"), - project_id) - return self.qm.get_total_usage_for_tenant(project_id) - - -def list_opts(): - yield scheduler_opt_group.name, scheduler_opts diff --git a/kingbird/engine/quota_manager.py b/kingbird/engine/quota_manager.py index 8e82dd4..f79f76b 100644 --- a/kingbird/engine/quota_manager.py +++ b/kingbird/engine/quota_manager.py @@ -83,7 +83,7 @@ class QuotaManager(manager.Manager): # Divide list of projects into batches and perfrom quota sync # for one batch at a time. for current_batch_projects in utils.get_batch_projects( - cfg.CONF.batch.batch_size, project_list): + cfg.CONF.batch.batch_size, project_list): LOG.info(_LI("Syncing quota for current batch with projects: %s"), current_batch_projects) for current_project in current_batch_projects: diff --git a/kingbird/engine/scheduler.py b/kingbird/engine/scheduler.py index 65b4632..058d2e1 100644 --- a/kingbird/engine/scheduler.py +++ b/kingbird/engine/scheduler.py @@ -1,5 +1,3 @@ -# Copyright 2016 Ericsson AB - # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -24,9 +22,6 @@ LOG = logging.getLogger(__name__) wallclock = time.time -CONF = cfg.CONF -LOG = logging.getLogger(__name__) - class ThreadGroupManager(object): '''Thread group manager.''' diff --git a/kingbird/engine/service.py b/kingbird/engine/service.py index 411430e..3423fc8 100644 --- a/kingbird/engine/service.py +++ b/kingbird/engine/service.py @@ -1,89 +1,140 @@ -# Copyright 2016 Ericsson AB +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import six +import time +import uuid +import functools from oslo_config import cfg from oslo_log import log as logging -from oslo_service import service as srv +import oslo_messaging + +from kingbird.common import consts +from kingbird.common import context +from kingbird.common import exceptions from kingbird.common.i18n import _ -from kingbird.common.serializer import KingbirdSerializer as Serializer -from kingbird.common.service import Service -from kingbird.common import topics -from kingbird.engine.listener import EngineManager +from kingbird.common.i18n import _LE +from kingbird.common.i18n import _LI +from kingbird.common import messaging as rpc_messaging +from kingbird.engine.quota_manager import QuotaManager -_TIMER_INTERVAL = 30 -_TIMER_INTERVAL_MAX = 60 +from kingbird.engine import scheduler + +from oslo_service import service CONF = cfg.CONF LOG = logging.getLogger(__name__) -host_opts = [ - cfg.StrOpt('host', - default='localhost', - help='hostname of the machine') -] -host_opt_group = cfg.OptGroup('host_details') -cfg.CONF.register_group(host_opt_group) -cfg.CONF.register_opts(host_opts, group=host_opt_group) +def request_context(func): + @functools.wraps(func) + def wrapped(self, ctx, *args, **kwargs): + if ctx is not None and not isinstance(ctx, context.RequestContext): + ctx = context.RequestContext.from_dict(ctx.to_dict()) + try: + return func(self, ctx, *args, **kwargs) + except exceptions.KingbirdException: + raise oslo_messaging.rpc.dispatcher.ExpectedException() + + return wrapped -class EngineService(Service): - def __init__(self, host, binary, topic, manager, report_interval=None, - periodic_enable=None, periodic_fuzzy_delay=None, - periodic_interval_max=None, serializer=None, - *args, **kwargs): - super(EngineService, self).__init__(host, binary, topic, manager, - report_interval, periodic_enable, - periodic_fuzzy_delay, - periodic_interval_max, serializer, - *args, **kwargs) +class EngineService(service.Service): + '''Lifecycle manager for a running service engine. + - All the methods in here are called from the RPC client. + - If a RPC call does not have a corresponding method here, an exceptions + will be thrown. + - Arguments to these calls are added dynamically and will be treated as + keyword arguments by the RPC client. + ''' -def create_service(): + def __init__(self, host, topic, manager=None): - LOG.debug(_('create KB engine service')) + super(EngineService, self).__init__() + self.host = cfg.CONF.host + self.rpc_api_version = consts.RPC_API_VERSION + self.topic = consts.TOPIC_KB_ENGINE + # The following are initialized here, but assigned in start() which + # happens after the fork when spawning multiple worker processes + self.engine_id = None + self.TG = None + self.periodic_enable = cfg.CONF.scheduler.periodic_enable + self.periodic_interval = cfg.CONF.scheduler.periodic_interval + self.target = None + self._rpc_server = None + self.qm = None - engine_manager = EngineManager() - engine_service = EngineService( - host=cfg.CONF.host_details.host, - binary="kb_engine", - topic=topics.TOPIC_KB_ENGINE, - manager=engine_manager, - periodic_enable=True, - report_interval=_TIMER_INTERVAL, - periodic_interval_max=_TIMER_INTERVAL_MAX, - serializer=Serializer() - ) + def init_tgm(self): + self.TG = scheduler.ThreadGroupManager() - engine_service.start() + def init_qm(self): + self.qm = QuotaManager() - return engine_service + def start(self): + self.engine_id = str(uuid.uuid4()) + self.init_tgm() + self.init_qm() + target = oslo_messaging.Target(version=self.rpc_api_version, + server=self.host, + topic=self.topic) + self.target = target + self._rpc_server = rpc_messaging.get_rpc_server(self.target, self) + self._rpc_server.start() + super(EngineService, self).start() + if self.periodic_enable: + LOG.info("Adding periodic tasks for the engine to perform") + self.TG.add_timer(self.periodic_interval, + self.periodic_balance_all, None, self.engine_id) -_launcher = None + def periodic_balance_all(self, engine_id): + # Automated Quota Sync for all the keystone projects + LOG.info(_LI("Periodic quota sync job started at: %s"), + time.strftime("%c")) + self.qm.periodic_balance_all(engine_id) + @request_context + def get_total_usage_for_tenant(self, context, project_id): + # Returns a dictionary containing nova, neutron & + # cinder usages for the project + LOG.info(_LI("Get total tenant usage called for: %s"), + project_id) + return self.qm.get_total_usage_for_tenant(project_id) -def serve(engine_service, workers=1): - global _launcher - if _launcher: - raise RuntimeError(_('serve() can only be called once')) + @request_context + def quota_sync_for_project(self, context, project_id): + # On Demand Quota Sync for a project, will be triggered by KB-API + LOG.info(_LI("On Demand Quota Sync Called for: %s"), + project_id) + self.qm.quota_sync_for_project(project_id) - _launcher = srv.launch(CONF, engine_service, workers=workers) + def _stop_rpc_server(self): + # Stop RPC connection to prevent new requests + LOG.debug(_("Attempting to stop engine service...")) + try: + self._rpc_server.stop() + self._rpc_server.wait() + LOG.info(_LI('Engine service stopped successfully')) + except Exception as ex: + LOG.error(_LE('Failed to stop engine service: %s'), + six.text_type(ex)) + def stop(self): + self._stop_rpc_server() -def wait(): - _launcher.wait() + self.TG.stop() + # Terminate the engine process + LOG.info(_LI("All threads were gone, terminating engine")) + super(EngineService, self).stop() diff --git a/kingbird/rpc/__init__.py b/kingbird/rpc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kingbird/rpc/client.py b/kingbird/rpc/client.py new file mode 100644 index 0000000..87f3898 --- /dev/null +++ b/kingbird/rpc/client.py @@ -0,0 +1,70 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +''' +Client side of the Kingbird RPC API. +''' + +from oslo_config import cfg +from oslo_log import log as logging + +from kingbird.common import config +from kingbird.common import consts +from kingbird.common import messaging + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) +config.register_options() + + +class EngineClient(object): + """Client side of the kingbird engine rpc API. + + Version History: + 1.0 - Initial version (Mitaka 1.0 release) + """ + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self): + self._client = messaging.get_rpc_client( + topic=consts.TOPIC_KB_ENGINE, + server=CONF.host, + version=self.BASE_RPC_API_VERSION) + + @staticmethod + def make_msg(method, **kwargs): + return method, kwargs + + def call(self, ctxt, msg, version=None): + method, kwargs = msg + if version is not None: + client = self._client.prepare(version=version) + else: + client = self._client + return client.call(ctxt, method, **kwargs) + + def cast(self, ctxt, msg, version=None): + method, kwargs = msg + if version is not None: + client = self._client.prepare(version=version) + else: + client = self._client + return client.cast(ctxt, method, **kwargs) + + def get_total_usage_for_tenant(self, ctxt, project_id): + return self.call(ctxt, self.make_msg('get_total_usage_for_tenant', + project_id=project_id)) + + def quota_sync_for_project(self, ctxt, project_id): + return self.cast(ctxt, self.make_msg('quota_sync_for_project', + project_id=project_id)) diff --git a/kingbird/tests/unit/api/test_quota_manager.py b/kingbird/tests/unit/api/test_quota_manager.py index b9d5b0d..b61af98 100644 --- a/kingbird/tests/unit/api/test_quota_manager.py +++ b/kingbird/tests/unit/api/test_quota_manager.py @@ -20,7 +20,7 @@ from oslo_config import cfg from kingbird.api.controllers import quota_manager from kingbird.common import config -from kingbird.common import rpc +from kingbird.rpc import client as rpc_client from kingbird.tests.unit.api.testroot import KBApiTest config.register_options() OPT_GROUP_NAME = 'keystone_authtoken' @@ -40,7 +40,7 @@ class TestQuotaManager(KBApiTest): cfg.CONF.set_override('admin_tenant', 'fake_tenant_id', group='cache') - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') def test_get_all_admin(self, mock_db_api): Res = Result('tenant_1', 'ram', 100) @@ -54,9 +54,9 @@ class TestQuotaManager(KBApiTest): self.assertEqual({'quota_set': {'project_id': 'tenant_1', 'ram': 100}}, eval(response.text)) - @mock.patch.object(rpc, 'get_client') + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') - def test_get_default_admin(self, mock_db_api, mock_client): + def test_get_default_admin(self, mock_db_api): mock_db_api.quota_class_get_default.return_value = \ {'class_name': 'default'} response = self.app.get( @@ -69,16 +69,20 @@ class TestQuotaManager(KBApiTest): cfg.CONF.kingbird_global_limit['quota_' + resource], result['quota_set'][resource]) - @mock.patch.object(rpc, 'get_client') - def test_get_usages_admin(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient') + def test_get_usages_admin(self, mock_rpc_client): + expected_usage = {"ram": 10} + mock_rpc_client().get_total_usage_for_tenant.return_value = \ + expected_usage response = self.app.get( '/v1.0/fake_tenant_id/os-quota-sets/tenant_1/detail', headers={'X_ROLE': 'admin'}) self.assertEqual(response.status_int, 200) + self.assertEqual(eval(response.body), {"quota_set": expected_usage}) - @mock.patch.object(rpc, 'get_client') + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') - def test_put_admin(self, mock_db_api, mock_client): + def test_put_admin(self, mock_db_api): Res = Result('tenant_1', 'cores', 10) mock_db_api.quota_update.return_value = Res data = {"quota_set": {Res.resource: Res.hard_limit}} @@ -90,9 +94,9 @@ class TestQuotaManager(KBApiTest): self.assertEqual({Res.project_id: {Res.resource: Res.hard_limit}}, eval(response.text)) - @mock.patch.object(rpc, 'get_client') + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') - def test_delete_admin(self, mock_db_api, mock_client): + def test_delete_admin(self, mock_db_api): Res = Result('tenant_1', 'cores', 10) mock_db_api.quota_destroy.return_value = Res data = {"quota_set": [Res.resource]} @@ -104,9 +108,9 @@ class TestQuotaManager(KBApiTest): self.assertEqual({'Deleted quota limits': [Res.resource]}, eval(response.text)) - @mock.patch.object(rpc, 'get_client') + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') - def test_delete_all_admin(self, mock_db_api, mock_client): + def test_delete_all_admin(self, mock_db_api): Res = Result('tenant_1', 'cores', 10) mock_db_api.quota_destroy_all.return_value = Res response = self.app.delete_json( @@ -116,8 +120,8 @@ class TestQuotaManager(KBApiTest): self.assertEqual('Deleted all quota limits for the given project', eval(response.text)) - @mock.patch.object(rpc, 'get_client') - def test_quota_sync_admin(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) + def test_quota_sync_admin(self): response = self.app.put_json( '/v1.0/fake_tenant_id/os-quota-sets/tenant_1/sync', headers={'X-Tenant-Id': 'fake_tenant', @@ -126,8 +130,8 @@ class TestQuotaManager(KBApiTest): self.assertEqual("triggered quota sync for tenant_1", eval(response.text)) - @mock.patch.object(rpc, 'get_client') - def test_put_nonadmin(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) + def test_put_nonadmin(self): Res = Result('tenant_1', 'cores', 10) data = {"quota_set": {Res.resource: Res.hard_limit}} try: @@ -137,16 +141,16 @@ class TestQuotaManager(KBApiTest): except webtest.app.AppError as admin_exception: self.assertIn('Admin required', admin_exception.message) - @mock.patch.object(rpc, 'get_client') - def test_delete_all_nonadmin(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) + def test_delete_all_nonadmin(self): try: self.app.delete_json('/v1.0/fake_tenant_id/os-quota-sets/tenant_1', headers={'X-Tenant-Id': 'fake_tenant'}) except webtest.app.AppError as admin_exception: self.assertIn('Admin required', admin_exception.message) - @mock.patch.object(rpc, 'get_client') - def test_delete_nonadmin(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) + def test_delete_nonadmin(self): Res = Result('tenant_1', 'cores', 10) data = {"quota_set": {Res.resource: Res.hard_limit}} try: @@ -156,8 +160,8 @@ class TestQuotaManager(KBApiTest): except webtest.app.AppError as admin_exception: self.assertIn('Admin required', admin_exception.message) - @mock.patch.object(rpc, 'get_client') - def test_quota_sync_nonadmin(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) + def test_quota_sync_nonadmin(self): try: self.app.put_json( '/v1.0/fake_tenant_id/os-quota-sets/tenant_1/sync', @@ -165,7 +169,7 @@ class TestQuotaManager(KBApiTest): except webtest.app.AppError as admin_exception: self.assertIn('Admin required', admin_exception.message) - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') def test_get_all_nonadmin(self, mock_db_api): Res = Result('tenant_1', 'ram', 100) @@ -179,9 +183,9 @@ class TestQuotaManager(KBApiTest): self.assertEqual({'quota_set': {'project_id': 'tenant_1', 'ram': 100}}, eval(response.text)) - @mock.patch.object(rpc, 'get_client') + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') - def test_get_default_nonadmin(self, mock_db_api, mock_client): + def test_get_default_nonadmin(self, mock_db_api): mock_db_api.quota_class_get_default.return_value = \ {'class_name': 'default'} response = self.app.get( @@ -194,8 +198,8 @@ class TestQuotaManager(KBApiTest): cfg.CONF.kingbird_global_limit['quota_' + resource], result['quota_set'][resource]) - @mock.patch.object(rpc, 'get_client') - def test_quota_sync_bad_request(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) + def test_quota_sync_bad_request(self): try: self.app.post_json( '/v1.0/fake_tenant_id/os-quota-sets/tenant_1/sync', @@ -205,9 +209,9 @@ class TestQuotaManager(KBApiTest): self.assertIn('Bad response: 404 Not Found', bad_method_exception.message) - @mock.patch.object(rpc, 'get_client') + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') - def test_put_invalid_payload(self, mock_db_api, mock_client): + def test_put_invalid_payload(self, mock_db_api): Res = Result('tenant_1', 'cores', 10) mock_db_api.quota_update.return_value = Res data = {'quota': {Res.resource: Res.hard_limit}} @@ -220,9 +224,9 @@ class TestQuotaManager(KBApiTest): self.assertIn('400 Bad Request', invalid_payload_exception.message) - @mock.patch.object(rpc, 'get_client') + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') - def test_put_invalid_input(self, mock_db_api, mock_client): + def test_put_invalid_input(self, mock_db_api): Res = Result('tenant_1', 'cores', -10) mock_db_api.quota_update.return_value = Res data = {"quota_set": {Res.resource: Res.hard_limit}} @@ -235,9 +239,9 @@ class TestQuotaManager(KBApiTest): self.assertIn('400 Bad Request', invalid_input_exception.message) - @mock.patch.object(rpc, 'get_client') + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) @mock.patch.object(quota_manager, 'db_api') - def test_delete_invalid_quota(self, mock_db_api, mock_client): + def test_delete_invalid_quota(self, mock_db_api): Res = Result('tenant_1', 'invalid_quota', 10) mock_db_api.quota_destroy.return_value = Res data = {"quota_set": [Res.resource]} @@ -250,15 +254,19 @@ class TestQuotaManager(KBApiTest): self.assertIn('The resource could not be found', invalid_quota_exception.message) - @mock.patch.object(rpc, 'get_client') - def test_get_usages_nonadmin(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient') + def test_get_usages_nonadmin(self, mock_rpc_client): + expected_usage = {"ram": 10} + mock_rpc_client().get_total_usage_for_tenant.return_value = \ + expected_usage response = self.app.get( '/v1.0/fake_tenant_id/os-quota-sets/tenant_1/detail', headers={'X_TENANT_ID': 'fake_tenant', 'X_USER_ID': 'nonadmin'}) self.assertEqual(response.status_int, 200) + self.assertEqual(eval(response.body), {"quota_set": expected_usage}) - @mock.patch.object(rpc, 'get_client') - def test_quota_sync_bad_action(self, mock_client): + @mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock()) + def test_quota_sync_bad_action(self): try: self.app.put_json( '/v1.0/fake_tenant_id/os-quota-sets/tenant_1/syncing', diff --git a/kingbird/tests/unit/api/testroot.py b/kingbird/tests/unit/api/testroot.py index 9cff3ae..3b6677b 100644 --- a/kingbird/tests/unit/api/testroot.py +++ b/kingbird/tests/unit/api/testroot.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import mock - import pecan from pecan.configuration import set_config from pecan.testing import load_test_app @@ -26,7 +24,6 @@ from oslo_utils import uuidutils from kingbird.api import api_config from kingbird.common import config -from kingbird.common import rpc from kingbird.tests import base config.register_options() @@ -108,7 +105,6 @@ class TestRootController(KBApiTest): class TestV1Controller(KBApiTest): - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) def test_get(self): response = self.app.get('/v1.0') self.assertEqual(response.status_int, 200) @@ -129,23 +125,18 @@ class TestV1Controller(KBApiTest): response = api_method('/v1.0', expect_errors=True) self.assertEqual(response.status_int, 405) - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) def test_post(self): self._test_method_returns_405('post') - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) def test_put(self): self._test_method_returns_405('put') - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) def test_patch(self): self._test_method_returns_405('patch') - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) def test_delete(self): self._test_method_returns_405('delete') - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) def test_head(self): self._test_method_returns_405('head') @@ -161,7 +152,6 @@ class TestErrors(KBApiTest): response = self.app.get('/assert_called_once', expect_errors=True) self.assertEqual(response.status_int, 404) - @mock.patch.object(rpc, 'get_client', new=mock.Mock()) def test_bad_method(self): response = self.app.patch('/v1.0/fake_tenant_id/bad_method', expect_errors=True) diff --git a/kingbird/tests/unit/engine/test_listener.py b/kingbird/tests/unit/engine/test_listener.py deleted file mode 100644 index 68868eb..0000000 --- a/kingbird/tests/unit/engine/test_listener.py +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock -import uuid - -from kingbird.engine import listener -from kingbird.tests import base -from kingbird.tests import utils - -FAKE_PROJECT = 'fake_project' -FAKE_ENGINE_ID = str(uuid.uuid4()) - - -class TestEngineManager(base.KingbirdTestCase): - def setUp(self): - super(TestEngineManager, self).setUp() - self.context = utils.dummy_context() - - @mock.patch.object(listener, 'QuotaManager') - def test_init(self, mock_qm): - engine_manager = listener.EngineManager() - self.assertIsNotNone(engine_manager) - self.assertEqual(engine_manager.qm, mock_qm()) - - @mock.patch.object(listener, 'QuotaManager') - def test_periodic_balance_all(self, mock_qm): - engine_manager = listener.EngineManager() - engine_manager.periodic_balance_all(FAKE_ENGINE_ID) - mock_qm().periodic_balance_all.assert_called_once_with(FAKE_ENGINE_ID) - - @mock.patch.object(listener, 'QuotaManager') - def test_quota_sync_for_project(self, mock_qm): - engine_manager = listener.EngineManager() - engine_manager.quota_sync_for_project(self.context, FAKE_PROJECT) - mock_qm().quota_sync_for_project.assert_called_once_with(FAKE_PROJECT) - - @mock.patch.object(listener, 'QuotaManager') - def test_get_total_usage_for_tenant(self, mock_qm): - engine_manager = listener.EngineManager() - engine_manager.get_total_usage_for_tenant(self.context, FAKE_PROJECT) - mock_qm().get_total_usage_for_tenant.assert_called_once_with( - FAKE_PROJECT) diff --git a/kingbird/tests/unit/engine/test_service.py b/kingbird/tests/unit/engine/test_service.py index 20d2b81..ba3d47d 100644 --- a/kingbird/tests/unit/engine/test_service.py +++ b/kingbird/tests/unit/engine/test_service.py @@ -12,9 +12,10 @@ import mock -from kingbird.engine import listener +from kingbird.engine import scheduler from kingbird.engine import service from kingbird.tests import base +from kingbird.tests import utils from oslo_config import cfg CONF = cfg.CONF @@ -23,34 +24,74 @@ CONF = cfg.CONF class TestEngineService(base.KingbirdTestCase): def setUp(self): super(TestEngineService, self).setUp() + self.tenant_id = 'fake_admin' + self.thm = scheduler.ThreadGroupManager() + self.context = utils.dummy_context(user='test_user', + tenant=self.tenant_id) + self.service_obj = service.EngineService('kingbird', + 'kingbird-engine') - @mock.patch.object(listener, 'QuotaManager') - def test_init(self, mock_qm): - manager = listener.EngineManager() - engine_service = service.EngineService('127.0.0.1', 'engine', - 'topic-A', manager) - self.assertIsNotNone(engine_service) + def test_init(self): + self.assertEqual(self.service_obj.host, 'localhost') + self.assertEqual(self.service_obj.topic, 'kingbird-engine') + self.assertEqual(self.service_obj.periodic_enable, + CONF.scheduler.periodic_enable) + self.assertEqual(self.service_obj.periodic_interval, + CONF.scheduler.periodic_interval) + def test_init_tgm(self): + self.service_obj.init_tgm() + self.assertIsNotNone(self.service_obj.TG) -@mock.patch.object(service, 'EngineService') -@mock.patch.object(listener, 'QuotaManager') -def test_create_service(mock_qm, mock_engine): - service.create_service() - mock_engine().start.assert_called_once_with() + @mock.patch.object(service, 'QuotaManager') + def test_init_qm(self, mock_quota_manager): + self.service_obj.init_qm() + self.assertIsNotNone(self.service_obj.qm) + @mock.patch.object(service, 'QuotaManager') + @mock.patch.object(service, 'rpc_messaging') + def test_start(self, mock_rpc, mock_quota_manager): + self.service_obj.start() + mock_rpc.get_rpc_server.assert_called_once_with( + self.service_obj.target, self.service_obj) + mock_rpc.get_rpc_server().start.assert_called_once_with() -@mock.patch.object(listener, 'QuotaManager') -@mock.patch.object(service, 'EngineService') -@mock.patch.object(service, 'srv') -def test_serve(mock_srv, mock_engine, mock_qm): - manager = listener.EngineManager() - engine_service = service.EngineService('127.0.0.1', 'engine', - 'topic-A', manager) - service.serve(engine_service, 2) - mock_srv.launch.assert_called_once_with(CONF, engine_service, workers=2) + @mock.patch.object(service, 'QuotaManager') + def test_periodic_balance_all(self, mock_quota_manager): + self.service_obj.init_tgm() + self.service_obj.init_qm() + self.service_obj.periodic_balance_all(self.service_obj.engine_id) + mock_quota_manager().periodic_balance_all.\ + assert_called_once_with(self.service_obj.engine_id) + @mock.patch.object(service, 'QuotaManager') + def test_get_total_usage_for_tenant(self, mock_quota_manager): + self.service_obj.init_tgm() + self.service_obj.init_qm() + self.service_obj.get_total_usage_for_tenant( + self.context, self.tenant_id) + mock_quota_manager().get_total_usage_for_tenant.\ + assert_called_once_with(self.tenant_id) -@mock.patch.object(service, '_launcher') -def test_wait(mock_launcher): - service.wait() - mock_launcher.wait.assert_called_once_with() + @mock.patch.object(service, 'QuotaManager') + def test_quota_sync_for_project(self, mock_quota_manager): + self.service_obj.init_tgm() + self.service_obj.init_qm() + self.service_obj.quota_sync_for_project( + self.context, self.tenant_id) + mock_quota_manager().quota_sync_for_project.\ + assert_called_once_with(self.tenant_id) + + @mock.patch.object(service, 'QuotaManager') + @mock.patch.object(service, 'rpc_messaging') + def test_stop_rpc_server(self, mock_rpc, mock_quota_manager): + self.service_obj.start() + self.service_obj._stop_rpc_server() + mock_rpc.get_rpc_server().stop.assert_called_once_with() + + @mock.patch.object(service, 'QuotaManager') + @mock.patch.object(service, 'rpc_messaging') + def test_stop(self, mock_rpc, mock_quota_manager): + self.service_obj.start() + self.service_obj.stop() + mock_rpc.get_rpc_server().stop.assert_called_once_with() diff --git a/kingbird/tests/unit/rpc/__init__.py b/kingbird/tests/unit/rpc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kingbird/tests/unit/rpc/test_rpc_client.py b/kingbird/tests/unit/rpc/test_rpc_client.py new file mode 100644 index 0000000..1a1f9e2 --- /dev/null +++ b/kingbird/tests/unit/rpc/test_rpc_client.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import mock + +from kingbird.common import config +from kingbird.common import messaging +from kingbird.rpc import client as rpc_client +from kingbird.tests import base +from kingbird.tests import utils + +config.register_options() + + +class EngineRpcAPITestCase(base.KingbirdTestCase): + + def setUp(self): + messaging.setup("fake://", optional=True) + self.addCleanup(messaging.cleanup) + self.context = utils.dummy_context() + # self.stubs = stubout.StubOutForTesting() + self.rpcapi = rpc_client.EngineClient() + super(EngineRpcAPITestCase, self).setUp() + + @mock.patch.object(messaging, 'get_rpc_client') + def test_call(self, mock_client): + client = mock.Mock() + mock_client.return_value = client + + method = 'fake_method' + kwargs = {'key': 'value'} + rpcapi = rpc_client.EngineClient() + msg = rpcapi.make_msg(method, **kwargs) + + # with no version + res = rpcapi.call(self.context, msg) + + self.assertEqual(client, rpcapi._client) + client.call.assert_called_once_with(self.context, 'fake_method', + key='value') + self.assertEqual(res, client.call.return_value) + + # with version + res = rpcapi.call(self.context, msg, version='123') + client.prepare.assert_called_once_with(version='123') + new_client = client.prepare.return_value + new_client.call.assert_called_once_with(self.context, 'fake_method', + key='value') + self.assertEqual(res, new_client.call.return_value) + + @mock.patch.object(messaging, 'get_rpc_client') + def test_cast(self, mock_client): + client = mock.Mock() + mock_client.return_value = client + + method = 'fake_method' + kwargs = {'key': 'value'} + rpcapi = rpc_client.EngineClient() + msg = rpcapi.make_msg(method, **kwargs) + + # with no version + res = rpcapi.cast(self.context, msg) + + self.assertEqual(client, rpcapi._client) + client.cast.assert_called_once_with(self.context, 'fake_method', + key='value') + self.assertEqual(res, client.cast.return_value) + + # with version + res = rpcapi.cast(self.context, msg, version='123') + client.prepare.assert_called_once_with(version='123') + new_client = client.prepare.return_value + new_client.cast.assert_called_once_with(self.context, 'fake_method', + key='value') + self.assertEqual(res, new_client.cast.return_value) diff --git a/kingbird/tests/utils.py b/kingbird/tests/utils.py index 9b79a04..d96577d 100644 --- a/kingbird/tests/utils.py +++ b/kingbird/tests/utils.py @@ -82,7 +82,7 @@ def create_quota_limit(ctxt, **kwargs): def dummy_context(user='test_username', tenant='test_project_id', region_name=None): - return context.ContextBase.from_dict({ + return context.RequestContext.from_dict({ 'auth_token': 'abcd1234', 'user': user, 'tenant': tenant,