RPC refactoring

Fix for the bug where api is not able to
send request to engine over RPC.
Modified and added UTs wherever required.

Change-Id: I44d5d3561427c6e055b45a3e474d4d07bcf60810
Closes-Bug: #1598235
This commit is contained in:
Ashish Singh 2016-07-01 22:05:58 +05:30
parent c60096692c
commit 819ed58e2c
28 changed files with 722 additions and 883 deletions

View File

@ -29,7 +29,6 @@ from kingbird.common.i18n import _LI
# from kingbird import policy
from kingbird.common import rpc
from kingbird.common import version
LOG = logging.getLogger(__name__)
@ -73,8 +72,6 @@ def init(args, **kwargs):
version='%%(prog)s %s' % version.version_info.release_string(),
**kwargs)
rpc.init(cfg.CONF)
def setup_logging():
"""Sets up the logging options for a log with supplied name."""

View File

@ -17,7 +17,6 @@ import collections
from oslo_config import cfg
from oslo_log import log as logging
from oslo_log import versionutils
import oslo_messaging as messaging
import pecan
from pecan import expose
from pecan import request
@ -27,11 +26,9 @@ import six
from kingbird.common import exceptions
from kingbird.common.i18n import _
from kingbird.common import rpc
from kingbird.common.serializer import KingbirdSerializer as Serializer
from kingbird.common import topics
from kingbird.common import utils
from kingbird.db.sqlalchemy import api as db_api
from kingbird.rpc import client as rpc_client
CONF = cfg.CONF
@ -57,18 +54,7 @@ class QuotaManagerController(object):
def __init__(self, *args, **kwargs):
super(QuotaManagerController, self).__init__(*args, **kwargs)
target = messaging.Target(topic=topics.TOPIC_KB_ENGINE, version='1.0')
upgrade_level = CONF.upgrade_levels.kb_engine
version_cap = 1.0
if upgrade_level == 'auto':
version_cap = self._determine_version_cap(target)
else:
version_cap = self.VERSION_ALIASES.get(upgrade_level,
upgrade_level)
serializer = Serializer()
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
self.rpc_client = rpc_client.EngineClient()
# to do the version compatibility for future purpose
def _determine_version_cap(self, target):
@ -95,10 +81,8 @@ class QuotaManagerController(object):
pecan.abort(404, _('Invalid request URL'))
elif action == 'detail':
# Get the current quota usages for a project
result = self.client.call(
context,
'get_total_usage_for_tenant',
project_id=project_id)
result = self.rpc_client.get_total_usage_for_tenant(
context, project_id)
else:
# Get quota limits for all the resources for a project
result = db_api.quota_get_all_by_project(
@ -184,8 +168,8 @@ class QuotaManagerController(object):
if not context.is_admin:
pecan.abort(403, _('Admin required'))
self.client.cast(context, 'quota_sync_for_project',
project_id=project_id)
self.rpc_client.quota_sync_for_project(
context, project_id)
return 'triggered quota sync for ' + project_id
@staticmethod

View File

@ -37,4 +37,4 @@ def extract_context_from_environ():
role = environ.get('HTTP_X_ROLE')
context_paras['is_admin'] = role == 'admin'
return k_context.Context(**context_paras)
return k_context.RequestContext(**context_paras)

7
kingbird/cmd/api.py Executable file → Normal file
View File

@ -19,8 +19,10 @@
import sys
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import systemd
from oslo_service import wsgi
import logging as std_logging
@ -31,10 +33,12 @@ from kingbird.api import app
from kingbird.common import config
from kingbird.common.i18n import _LI
from kingbird.common.i18n import _LW
from kingbird.common import messaging
CONF = cfg.CONF
config.register_options()
LOG = logging.getLogger('kingbird.api')
eventlet.monkey_patch(os=False)
def main():
@ -52,7 +56,8 @@ def main():
LOG.info(_LI("Server on http://%(host)s:%(port)s with %(workers)s"),
{'host': host, 'port': port, 'workers': workers})
messaging.setup()
systemd.notify_once()
service = wsgi.Server(CONF, "Kingbird", application, host, port)
app.serve(service, CONF, workers)

74
kingbird/cmd/engine.py Executable file → Normal file
View File

@ -1,64 +1,54 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Kingbird Engine Server.
"""
import eventlet
if __name__ == "__main__":
eventlet.monkey_patch()
import sys
eventlet.monkey_patch()
from oslo_config import cfg
from oslo_i18n import _lazy
from oslo_log import log as logging
import logging as std_logging
from oslo_service import service
from kingbird.common import config
from kingbird.common import consts
from kingbird.common import messaging
from kingbird.common.i18n import _LI
from kingbird.common.i18n import _LW
from kingbird.engine import engine_config
from kingbird.engine import service
CONF = cfg.CONF
_lazy.enable_lazy()
config.register_options()
LOG = logging.getLogger('kingbird.engine')
def main():
engine_config.init(sys.argv[1:])
engine_config.setup_logging()
logging.register_options(cfg.CONF)
cfg.CONF(project='kingbird', prog='kingbird-engine')
logging.setup(cfg.CONF, 'kingbird-engine')
logging.set_defaults()
messaging.setup()
host = CONF.host
workers = CONF.workers
if workers < 1:
LOG.warning(_LW("Wrong worker number, worker = %(workers)s"), workers)
workers = 1
LOG.info(_LI("Server on http://%(host)s with %(workers)s"),
{'host': host, 'workers': workers})
engine_service = service.create_service()
service.serve(engine_service, workers)
service.wait()
LOG.info(_LI("Configuration:"))
CONF.log_opt_values(LOG, std_logging.INFO)
from kingbird.engine import service as engine
srv = engine.EngineService(cfg.CONF.host,
consts.TOPIC_KB_ENGINE)
launcher = service.launch(cfg.CONF,
srv, workers=cfg.CONF.workers)
# the following periodic tasks are intended serve as HA checking
# srv.create_periodic_tasks()
launcher.wait()
if __name__ == '__main__':
main()

View File

@ -128,6 +128,27 @@ cache_opts = [
' auto_refresh_endpoint set to True')
]
scheduler_opts = [
cfg.BoolOpt('periodic_enable',
default=True,
help='boolean value for enable/disenable periodic tasks'),
cfg.IntOpt('periodic_interval',
default=900,
help='periodic time interval for automatic quota sync job')
]
common_opts = [
cfg.IntOpt('workers', default=1,
help='number of workers'),
cfg.StrOpt('host',
default='localhost',
help='hostname of the machine')
]
scheduler_opt_group = cfg.OptGroup('scheduler',
title='Scheduler options for periodic job')
# The group stores Kingbird global limit for all the projects
default_quota_group = cfg.OptGroup(name='kingbird_global_limit',
title='Global quota limit for all projects')
@ -141,7 +162,9 @@ def list_opts():
yield default_quota_group.name, neutron_quotas
yield default_quota_group.name, cinder_quotas
yield cache_opt_group.name, cache_opts
yield scheduler_opt_group.name, scheduler_opts
yield None, global_opts
yield None, common_opts
def register_options():

View File

@ -35,3 +35,7 @@ NEUTRON_QUOTA_FIELDS = ("network",
"security_group",
"security_group_rule",
)
RPC_API_VERSION = "1.0"
TOPIC_KB_ENGINE = "kingbird-engine"

View File

@ -1,84 +1,124 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_context import context as oslo_ctx
from oslo_context import context as base_context
from oslo_utils import encodeutils
from kingbird.common import policy
from kingbird.db import api as db_api
class ContextBase(oslo_ctx.RequestContext):
def __init__(self, auth_token=None, user_id=None, tenant_id=None,
is_admin=False, request_id=None, overwrite=True,
user_name=None, tenant_name=None, auth_url=None,
region=None, password=None, domain='default',
project_name=None, **kwargs):
super(ContextBase, self).__init__(
auth_token=auth_token,
user=user_id or kwargs.get('user', None),
tenant=tenant_id or kwargs.get('tenant', None),
domain=kwargs.get('domain', None),
user_domain=kwargs.get('user_domain', None),
project_domain=kwargs.get('project_domain', None),
is_admin=is_admin,
read_only=kwargs.get('read_only', False),
show_deleted=kwargs.get('show_deleted', False),
request_id=request_id,
resource_uuid=kwargs.get('resource_uuid', None),
overwrite=overwrite)
self.user_name = user_name
self.tenant_name = tenant_name
self.tenant_id = tenant_id
self.auth_url = auth_url
self.password = password
self.default_name = domain
self.region_name = region
self.project_name = project_name
class RequestContext(base_context.RequestContext):
'''Stores information about the security context.
def to_dict(self):
ctx_dict = super(ContextBase, self).to_dict()
ctx_dict.update({
'user_name': self.user_name,
'tenant_name': self.tenant_name,
'project_name': self.project_name,
'tenant_id': self.tenant_id,
'auth_url': self.auth_url,
'password': self.password,
'default_name': self.default_name,
'region_name': self.region_name,
})
return ctx_dict
The context encapsulates information related to the user accessing the
the system, as well as additional request information.
'''
@classmethod
def from_dict(cls, ctx):
return cls(**ctx)
def __init__(self, auth_token=None, user=None, project=None,
domain=None, user_domain=None, project_domain=None,
is_admin=None, read_only=False, show_deleted=False,
request_id=None, auth_url=None, trusts=None,
user_name=None, project_name=None, domain_name=None,
user_domain_name=None, project_domain_name=None,
auth_token_info=None, region_name=None, roles=None,
password=None, **kwargs):
'''Initializer of request context.'''
# We still have 'tenant' param because oslo_context still use it.
super(RequestContext, self).__init__(
auth_token=auth_token, user=user, tenant=project,
domain=domain, user_domain=user_domain,
project_domain=project_domain,
read_only=read_only, show_deleted=show_deleted,
request_id=request_id)
class Context(ContextBase):
def __init__(self, **kwargs):
super(Context, self).__init__(**kwargs)
# request_id might be a byte array
self.request_id = encodeutils.safe_decode(self.request_id)
# we save an additional 'project' internally for use
self.project = project
# Session for DB access
self._session = None
self.auth_url = auth_url
self.trusts = trusts
self.user_name = user_name
self.project_name = project_name
self.domain_name = domain_name
self.user_domain_name = user_domain_name
self.project_domain_name = project_domain_name
self.auth_token_info = auth_token_info
self.region_name = region_name
self.roles = roles or []
self.password = password
# Check user is admin or not
if is_admin is None:
self.is_admin = policy.enforce(self, 'context_is_admin',
target={'project': self.project},
do_raise=False)
else:
self.is_admin = is_admin
@property
def session(self):
# todo get db session in the context
# if not self._session:
# self._session = dal.get_session()
if self._session is None:
self._session = db_api.get_session()
return self._session
def to_dict(self):
return {
'auth_url': self.auth_url,
'auth_token': self.auth_token,
'auth_token_info': self.auth_token_info,
'user': self.user,
'user_name': self.user_name,
'user_domain': self.user_domain,
'user_domain_name': self.user_domain_name,
'project': self.project,
'project_name': self.project_name,
'project_domain': self.project_domain,
'project_domain_name': self.project_domain_name,
'domain': self.domain,
'domain_name': self.domain_name,
'trusts': self.trusts,
'region_name': self.region_name,
'roles': self.roles,
'show_deleted': self.show_deleted,
'is_admin': self.is_admin,
'request_id': self.request_id,
'password': self.password,
}
def get_admin_context(read_only=True):
return ContextBase(user_id=None,
project_id=None,
is_admin=True,
overwrite=False,
read_only=read_only)
@classmethod
def from_dict(cls, values):
return cls(**values)
def get_admin_context(show_deleted=False):
return RequestContext(is_admin=True, show_deleted=show_deleted)
def get_service_context(**args):
'''An abstraction layer for getting service context.
There could be multiple cloud backends for kingbird to use. This
abstraction layer provides an indirection for kingbird to get the
credentials of 'kingbird' user on the specific cloud. By default,
this credential refers to the credentials built for keystone middleware
in an OpenStack cloud.
'''
pass

View File

@ -42,20 +42,12 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import periodic_task
from kingbird.common import config
CONF = cfg.CONF
config.register_options()
LOG = logging.getLogger(__name__)
host_opts = [
cfg.StrOpt('host',
default='localhost',
help='hostname of the machine')
]
host_opt_group = cfg.OptGroup('host_details')
cfg.CONF.register_group(host_opt_group)
cfg.CONF.register_opts(host_opts, group=host_opt_group)
class PeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self):
@ -66,7 +58,7 @@ class Manager(PeriodicTasks):
def __init__(self, host=None, service_name='undefined'):
if not host:
host = cfg.CONF.host_details.host
host = cfg.CONF.host
self.host = host
self.service_name = service_name
# self.notifier = rpc.get_notifier(self.service_name, self.host)
@ -123,7 +115,3 @@ class Manager(PeriodicTasks):
"""
pass
def list_opts():
yield host_opt_group.name, host_opts

View File

@ -0,0 +1,111 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo_config import cfg
import oslo_messaging
from oslo_serialization import jsonutils
from kingbird.common import context
TRANSPORT = None
NOTIFIER = None
_ALIASES = {
'kingbird.openstack.common.rpc.impl_kombu': 'rabbit',
'kingbird.openstack.common.rpc.impl_qpid': 'qpid',
'kingbird.openstack.common.rpc.impl_zmq': 'zmq',
}
class RequestContextSerializer(oslo_messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
@staticmethod
def serialize_context(ctxt):
return ctxt.to_dict()
@staticmethod
def deserialize_context(ctxt):
return context.RequestContext.from_dict(ctxt)
class JsonPayloadSerializer(oslo_messaging.NoOpSerializer):
@classmethod
def serialize_entity(cls, context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
def setup(url=None, optional=False):
"""Initialise the oslo_messaging layer."""
global TRANSPORT, NOTIFIER
if url and url.startswith("fake://"):
# NOTE: oslo_messaging fake driver uses time.sleep
# for task switch, so we need to monkey_patch it
eventlet.monkey_patch(time=True)
if not TRANSPORT:
oslo_messaging.set_transport_defaults('kingbird')
exmods = ['kingbird.common.exception']
try:
TRANSPORT = oslo_messaging.get_transport(
cfg.CONF, url, allowed_remote_exmods=exmods, aliases=_ALIASES)
except oslo_messaging.InvalidTransportURL as e:
TRANSPORT = None
if not optional or e.url:
raise
if not NOTIFIER and TRANSPORT:
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
"""Cleanup the oslo_messaging layer."""
global TRANSPORT, NOTIFIER
if TRANSPORT:
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def get_rpc_server(target, endpoint):
"""Return a configured oslo_messaging rpc server."""
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet',
serializer=serializer)
def get_rpc_client(**kwargs):
"""Return a configured oslo_messaging RPCClient."""
target = oslo_messaging.Target(**kwargs)
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo_messaging.RPCClient(TRANSPORT, target,
serializer=serializer)
def get_notifier(publisher_id):
"""Return a configured oslo_messaging notifier."""
return NOTIFIER.prepare(publisher_id=publisher_id)

49
kingbird/common/policy.py Normal file
View File

@ -0,0 +1,49 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Policy Engine For Kingbird
"""
# from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_policy import policy
from kingbird.common import exceptions
POLICY_ENFORCER = None
CONF = cfg.CONF
# @lockutils.synchronized('policy_enforcer', 'kingbird-')
def _get_enforcer(policy_file=None, rules=None, default_rule=None):
global POLICY_ENFORCER
if POLICY_ENFORCER is None:
POLICY_ENFORCER = policy.Enforcer(CONF,
policy_file=policy_file,
rules=rules,
default_rule=default_rule)
return POLICY_ENFORCER
def enforce(context, rule, target, do_raise=True, *args, **kwargs):
enforcer = _get_enforcer()
credentials = context.to_dict()
target = target or {}
if do_raise:
kwargs.update(exc=exceptions.Forbidden)
return enforcer.enforce(rule, target, credentials, do_raise,
*args, **kwargs)

View File

@ -1,135 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# copy and modify from Nova
__all__ = [
'init',
'cleanup',
'set_defaults',
'add_extra_exmods',
'clear_extra_exmods',
'get_allowed_exmods',
'RequestContextSerializer',
'get_client',
'get_server',
'get_notifier',
]
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_serialization import jsonutils
import kingbird.common.context
import kingbird.common.exceptions
CONF = cfg.CONF
TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
kingbird.common.exceptions.__name__,
]
EXTRA_EXMODS = []
def init(conf):
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods)
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
global TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def set_defaults(control_exchange):
messaging.set_transport_defaults(control_exchange)
def add_extra_exmods(*args):
EXTRA_EXMODS.extend(args)
def clear_extra_exmods():
del EXTRA_EXMODS[:]
def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
return context.to_dict()
def deserialize_context(self, context):
return kingbird.common.context.Context.from_dict(context)
def get_transport_url(url_str=None):
return messaging.TransportURL.parse(CONF, url_str)
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
def get_notifier(service, host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)

View File

@ -1,198 +0,0 @@
# Copyright (c) 2015 Huawei, Tech. Co,. Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# copy and modify from Nova
"""Generic service base class for all workers that run on hosts."""
import os
import random
import sys
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service
import baserpc
from kingbird.common.i18n import _
from kingbird.common.i18n import _LE
from kingbird.common.i18n import _LI
from kingbird.common import context
from kingbird.common import rpc
from kingbird.common import version
LOG = logging.getLogger(__name__)
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='Seconds between nodes reporting state to datastore'),
cfg.BoolOpt('periodic_enable',
default=True,
help='Enable periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='Range of seconds to randomly delay when starting the'
' periodic task scheduler to reduce stampeding.'
' (Disable by setting to 0)'),
]
CONF = cfg.CONF
CONF.register_opts(service_opts)
class Service(service.Service):
"""class Service
Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager and reports
its state to the database services table.
"""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, serializer=None,
*args, **kwargs):
super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
self.manager = manager
self.rpc_server = None
self.report_interval = report_interval
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.interval_max = periodic_interval_max
self.serializer = serializer
self.saved_args, self.saved_kwargs = args, kwargs
def start(self):
ver_str = version.version_string_with_package()
LOG.info(_LI('Starting %(topic)s node (version %(version)s)'),
{'topic': self.topic, 'version': ver_str})
self.basic_config_check()
self.manager.init_host()
self.manager.pre_start_hook()
LOG.debug(_("Creating RPC server for service %s"), self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [
self.manager,
baserpc.BaseServerRPCAPI(self.manager.service_name)
]
endpoints.extend(self.manager.additional_endpoints)
self.rpc_server = rpc.get_server(target, endpoints, self.serializer)
self.rpc_server.start()
self.manager.post_start_hook()
if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
self.tg.add_dynamic_timer(self.periodic_tasks,
initial_delay=initial_delay,
periodic_interval_max=self.interval_max)
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None,
serializer=None,):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_enable: defaults to CONF.periodic_enable
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param periodic_interval_max: if set, the max time to wait between runs
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(sys.argv[0])
if not topic:
topic = binary.rpartition('kingbird-')[2]
if not manager:
manager_cls = ('%s_manager' %
binary.rpartition('kingbird-')[2])
manager = CONF.get(manager_cls, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_enable is None:
periodic_enable = CONF.periodic_enable
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max,
serializer=serializer)
return service_obj
def kill(self):
self.stop()
def stop(self):
try:
self.rpc_server.stop()
self.rpcserver.wait()
except Exception:
pass
try:
self.manager.cleanup_host()
except Exception:
LOG.exception(_LE('Service error occurred during cleanup_host'))
pass
super(Service, self).stop()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def basic_config_check(self):
"""Perform basic config checks before starting processing."""
# Make sure the tempdir exists and is writable
# try:
# with utils.tempdir():
# pass
# except Exception as e:
# LOG.error(_LE('Temporary directory is invalid: %s'), e)
# sys.exit(1)

View File

@ -1,20 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
CREATE = 'create'
DELETE = 'delete'
UPDATE = 'update'
TOPIC_KB_ENGINE = 'engine'

View File

@ -1,81 +0,0 @@
# Copyright 2016 Ericsson AB
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Routines for configuring kingbird, largely copy from Neutron
"""
import sys
from oslo_config import cfg
from oslo_log import log as logging
from kingbird.common.i18n import _
from kingbird.common.i18n import _LI
from kingbird.common import rpc
from kingbird.common import version
LOG = logging.getLogger(__name__)
common_opts = [
cfg.StrOpt('host', default='kingbird.host',
help=_("The host name for RPC server")),
cfg.IntOpt('workers', default=2,
help=_("number of workers")),
cfg.StrOpt('state_path',
default='/var/lib/kingbird',
deprecated_name='pybasedir',
help="Top-level directory for maintaining kingbird's state"),
]
def init(args, **kwargs):
# Register the configuration options
cfg.CONF.register_opts(common_opts)
# ks_session.Session.register_conf_options(cfg.CONF)
# auth.register_conf_options(cfg.CONF)
logging.register_options(cfg.CONF)
cfg.CONF(args=args, project='kingbird.engine',
version='%%(prog)s %s' % version.version_info.release_string(),
**kwargs)
rpc.init(cfg.CONF)
def setup_logging():
"""Sets up the logging options for a log with supplied name."""
product_name = "kingbird.engine"
logging.setup(cfg.CONF, product_name)
LOG.info(_LI("Logging enabled!"))
LOG.info(_LI("%(prog)s version %(version)s"),
{'prog': sys.argv[0],
'version': version.version_info.release_string()})
LOG.debug("command line: %s", " ".join(sys.argv))
def reset_service():
# Reset worker in case SIGHUP is called.
# Note that this is called only in case a service is running in
# daemon mode.
setup_logging()
# TODO(joehuang) enforce policy later
# policy.refresh()
def list_opts():
yield None, common_opts

View File

@ -1,108 +0,0 @@
# Copyright 2016 Ericsson AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import uuid
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from kingbird.common.i18n import _
from kingbird.common.i18n import _LI
from kingbird.common import manager
from kingbird.engine.quota_manager import QuotaManager
from kingbird.engine import scheduler
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
scheduler_opts = [
cfg.BoolOpt('periodic_enable',
default=True,
help='boolean value for enable/disenable periodic tasks'),
cfg.IntOpt('periodic_interval',
default=900,
help='periodic time interval for automatic quota sync job')
]
scheduler_opt_group = cfg.OptGroup('scheduler')
cfg.CONF.register_group(scheduler_opt_group)
cfg.CONF.register_opts(scheduler_opts, group=scheduler_opt_group)
class EngineManager(manager.Manager):
"""Manages all the kb engine activities."""
target = messaging.Target(version='1.0')
def __init__(self, *args, **kwargs):
self.engine_id = str(uuid.uuid4())
self.qm = QuotaManager()
self.TG = scheduler.ThreadGroupManager()
self.periodic_enable = cfg.CONF.scheduler.periodic_enable
self.periodic_interval = cfg.CONF.scheduler.periodic_interval
LOG.debug(_('Engine initialization...'))
super(EngineManager, self).__init__(service_name="engine_manager",
*args, **kwargs)
if self.periodic_enable:
LOG.debug("Adding periodic tasks for the engine to perform")
self.TG.add_timer(self.periodic_interval,
self.periodic_balance_all, None, self.engine_id)
def init_host(self):
LOG.debug(_('Engine init_host...'))
pass
def cleanup_host(self):
LOG.debug(_('Engine cleanup_host...'))
pass
def pre_start_hook(self):
LOG.debug(_('Engine pre_start_hook...'))
pass
def post_start_hook(self):
LOG.debug(_('Engine post_start_hook...'))
pass
def periodic_balance_all(self, engine_id):
# Automated Quota Sync for all the keystone projects
LOG.info(_LI("Periodic quota sync job started at: %s"),
time.strftime("%c"))
self.qm.periodic_balance_all(engine_id)
def quota_sync_for_project(self, ctx, project_id):
# On Demand Quota Sync for a project, will be triggered by KB-API
LOG.info(_LI("On Demand Quota Sync Called for: %s"),
project_id)
self.qm.quota_sync_for_project(project_id)
def get_total_usage_for_tenant(self, ctx, project_id):
# Returns a dictionary containing nova, neutron &
# cinder usages for the project
LOG.info(_LI("Get total tenant usage called for: %s"),
project_id)
return self.qm.get_total_usage_for_tenant(project_id)
def list_opts():
yield scheduler_opt_group.name, scheduler_opts

View File

@ -83,7 +83,7 @@ class QuotaManager(manager.Manager):
# Divide list of projects into batches and perfrom quota sync
# for one batch at a time.
for current_batch_projects in utils.get_batch_projects(
cfg.CONF.batch.batch_size, project_list):
cfg.CONF.batch.batch_size, project_list):
LOG.info(_LI("Syncing quota for current batch with projects: %s"),
current_batch_projects)
for current_project in current_batch_projects:

View File

@ -1,5 +1,3 @@
# Copyright 2016 Ericsson AB
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -24,9 +22,6 @@ LOG = logging.getLogger(__name__)
wallclock = time.time
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class ThreadGroupManager(object):
'''Thread group manager.'''

View File

@ -1,89 +1,140 @@
# Copyright 2016 Ericsson AB
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import time
import uuid
import functools
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service as srv
import oslo_messaging
from kingbird.common import consts
from kingbird.common import context
from kingbird.common import exceptions
from kingbird.common.i18n import _
from kingbird.common.serializer import KingbirdSerializer as Serializer
from kingbird.common.service import Service
from kingbird.common import topics
from kingbird.engine.listener import EngineManager
from kingbird.common.i18n import _LE
from kingbird.common.i18n import _LI
from kingbird.common import messaging as rpc_messaging
from kingbird.engine.quota_manager import QuotaManager
_TIMER_INTERVAL = 30
_TIMER_INTERVAL_MAX = 60
from kingbird.engine import scheduler
from oslo_service import service
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
host_opts = [
cfg.StrOpt('host',
default='localhost',
help='hostname of the machine')
]
host_opt_group = cfg.OptGroup('host_details')
cfg.CONF.register_group(host_opt_group)
cfg.CONF.register_opts(host_opts, group=host_opt_group)
def request_context(func):
@functools.wraps(func)
def wrapped(self, ctx, *args, **kwargs):
if ctx is not None and not isinstance(ctx, context.RequestContext):
ctx = context.RequestContext.from_dict(ctx.to_dict())
try:
return func(self, ctx, *args, **kwargs)
except exceptions.KingbirdException:
raise oslo_messaging.rpc.dispatcher.ExpectedException()
return wrapped
class EngineService(Service):
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, serializer=None,
*args, **kwargs):
super(EngineService, self).__init__(host, binary, topic, manager,
report_interval, periodic_enable,
periodic_fuzzy_delay,
periodic_interval_max, serializer,
*args, **kwargs)
class EngineService(service.Service):
'''Lifecycle manager for a running service engine.
- All the methods in here are called from the RPC client.
- If a RPC call does not have a corresponding method here, an exceptions
will be thrown.
- Arguments to these calls are added dynamically and will be treated as
keyword arguments by the RPC client.
'''
def create_service():
def __init__(self, host, topic, manager=None):
LOG.debug(_('create KB engine service'))
super(EngineService, self).__init__()
self.host = cfg.CONF.host
self.rpc_api_version = consts.RPC_API_VERSION
self.topic = consts.TOPIC_KB_ENGINE
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
self.engine_id = None
self.TG = None
self.periodic_enable = cfg.CONF.scheduler.periodic_enable
self.periodic_interval = cfg.CONF.scheduler.periodic_interval
self.target = None
self._rpc_server = None
self.qm = None
engine_manager = EngineManager()
engine_service = EngineService(
host=cfg.CONF.host_details.host,
binary="kb_engine",
topic=topics.TOPIC_KB_ENGINE,
manager=engine_manager,
periodic_enable=True,
report_interval=_TIMER_INTERVAL,
periodic_interval_max=_TIMER_INTERVAL_MAX,
serializer=Serializer()
)
def init_tgm(self):
self.TG = scheduler.ThreadGroupManager()
engine_service.start()
def init_qm(self):
self.qm = QuotaManager()
return engine_service
def start(self):
self.engine_id = str(uuid.uuid4())
self.init_tgm()
self.init_qm()
target = oslo_messaging.Target(version=self.rpc_api_version,
server=self.host,
topic=self.topic)
self.target = target
self._rpc_server = rpc_messaging.get_rpc_server(self.target, self)
self._rpc_server.start()
super(EngineService, self).start()
if self.periodic_enable:
LOG.info("Adding periodic tasks for the engine to perform")
self.TG.add_timer(self.periodic_interval,
self.periodic_balance_all, None, self.engine_id)
_launcher = None
def periodic_balance_all(self, engine_id):
# Automated Quota Sync for all the keystone projects
LOG.info(_LI("Periodic quota sync job started at: %s"),
time.strftime("%c"))
self.qm.periodic_balance_all(engine_id)
@request_context
def get_total_usage_for_tenant(self, context, project_id):
# Returns a dictionary containing nova, neutron &
# cinder usages for the project
LOG.info(_LI("Get total tenant usage called for: %s"),
project_id)
return self.qm.get_total_usage_for_tenant(project_id)
def serve(engine_service, workers=1):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
@request_context
def quota_sync_for_project(self, context, project_id):
# On Demand Quota Sync for a project, will be triggered by KB-API
LOG.info(_LI("On Demand Quota Sync Called for: %s"),
project_id)
self.qm.quota_sync_for_project(project_id)
_launcher = srv.launch(CONF, engine_service, workers=workers)
def _stop_rpc_server(self):
# Stop RPC connection to prevent new requests
LOG.debug(_("Attempting to stop engine service..."))
try:
self._rpc_server.stop()
self._rpc_server.wait()
LOG.info(_LI('Engine service stopped successfully'))
except Exception as ex:
LOG.error(_LE('Failed to stop engine service: %s'),
six.text_type(ex))
def stop(self):
self._stop_rpc_server()
def wait():
_launcher.wait()
self.TG.stop()
# Terminate the engine process
LOG.info(_LI("All threads were gone, terminating engine"))
super(EngineService, self).stop()

0
kingbird/rpc/__init__.py Normal file
View File

70
kingbird/rpc/client.py Normal file
View File

@ -0,0 +1,70 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''
Client side of the Kingbird RPC API.
'''
from oslo_config import cfg
from oslo_log import log as logging
from kingbird.common import config
from kingbird.common import consts
from kingbird.common import messaging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
config.register_options()
class EngineClient(object):
"""Client side of the kingbird engine rpc API.
Version History:
1.0 - Initial version (Mitaka 1.0 release)
"""
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
self._client = messaging.get_rpc_client(
topic=consts.TOPIC_KB_ENGINE,
server=CONF.host,
version=self.BASE_RPC_API_VERSION)
@staticmethod
def make_msg(method, **kwargs):
return method, kwargs
def call(self, ctxt, msg, version=None):
method, kwargs = msg
if version is not None:
client = self._client.prepare(version=version)
else:
client = self._client
return client.call(ctxt, method, **kwargs)
def cast(self, ctxt, msg, version=None):
method, kwargs = msg
if version is not None:
client = self._client.prepare(version=version)
else:
client = self._client
return client.cast(ctxt, method, **kwargs)
def get_total_usage_for_tenant(self, ctxt, project_id):
return self.call(ctxt, self.make_msg('get_total_usage_for_tenant',
project_id=project_id))
def quota_sync_for_project(self, ctxt, project_id):
return self.cast(ctxt, self.make_msg('quota_sync_for_project',
project_id=project_id))

View File

@ -20,7 +20,7 @@ from oslo_config import cfg
from kingbird.api.controllers import quota_manager
from kingbird.common import config
from kingbird.common import rpc
from kingbird.rpc import client as rpc_client
from kingbird.tests.unit.api.testroot import KBApiTest
config.register_options()
OPT_GROUP_NAME = 'keystone_authtoken'
@ -40,7 +40,7 @@ class TestQuotaManager(KBApiTest):
cfg.CONF.set_override('admin_tenant', 'fake_tenant_id',
group='cache')
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_get_all_admin(self, mock_db_api):
Res = Result('tenant_1', 'ram', 100)
@ -54,9 +54,9 @@ class TestQuotaManager(KBApiTest):
self.assertEqual({'quota_set': {'project_id': 'tenant_1', 'ram': 100}},
eval(response.text))
@mock.patch.object(rpc, 'get_client')
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_get_default_admin(self, mock_db_api, mock_client):
def test_get_default_admin(self, mock_db_api):
mock_db_api.quota_class_get_default.return_value = \
{'class_name': 'default'}
response = self.app.get(
@ -69,16 +69,20 @@ class TestQuotaManager(KBApiTest):
cfg.CONF.kingbird_global_limit['quota_' + resource],
result['quota_set'][resource])
@mock.patch.object(rpc, 'get_client')
def test_get_usages_admin(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient')
def test_get_usages_admin(self, mock_rpc_client):
expected_usage = {"ram": 10}
mock_rpc_client().get_total_usage_for_tenant.return_value = \
expected_usage
response = self.app.get(
'/v1.0/fake_tenant_id/os-quota-sets/tenant_1/detail',
headers={'X_ROLE': 'admin'})
self.assertEqual(response.status_int, 200)
self.assertEqual(eval(response.body), {"quota_set": expected_usage})
@mock.patch.object(rpc, 'get_client')
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_put_admin(self, mock_db_api, mock_client):
def test_put_admin(self, mock_db_api):
Res = Result('tenant_1', 'cores', 10)
mock_db_api.quota_update.return_value = Res
data = {"quota_set": {Res.resource: Res.hard_limit}}
@ -90,9 +94,9 @@ class TestQuotaManager(KBApiTest):
self.assertEqual({Res.project_id: {Res.resource: Res.hard_limit}},
eval(response.text))
@mock.patch.object(rpc, 'get_client')
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_delete_admin(self, mock_db_api, mock_client):
def test_delete_admin(self, mock_db_api):
Res = Result('tenant_1', 'cores', 10)
mock_db_api.quota_destroy.return_value = Res
data = {"quota_set": [Res.resource]}
@ -104,9 +108,9 @@ class TestQuotaManager(KBApiTest):
self.assertEqual({'Deleted quota limits': [Res.resource]},
eval(response.text))
@mock.patch.object(rpc, 'get_client')
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_delete_all_admin(self, mock_db_api, mock_client):
def test_delete_all_admin(self, mock_db_api):
Res = Result('tenant_1', 'cores', 10)
mock_db_api.quota_destroy_all.return_value = Res
response = self.app.delete_json(
@ -116,8 +120,8 @@ class TestQuotaManager(KBApiTest):
self.assertEqual('Deleted all quota limits for the given project',
eval(response.text))
@mock.patch.object(rpc, 'get_client')
def test_quota_sync_admin(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
def test_quota_sync_admin(self):
response = self.app.put_json(
'/v1.0/fake_tenant_id/os-quota-sets/tenant_1/sync',
headers={'X-Tenant-Id': 'fake_tenant',
@ -126,8 +130,8 @@ class TestQuotaManager(KBApiTest):
self.assertEqual("triggered quota sync for tenant_1",
eval(response.text))
@mock.patch.object(rpc, 'get_client')
def test_put_nonadmin(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
def test_put_nonadmin(self):
Res = Result('tenant_1', 'cores', 10)
data = {"quota_set": {Res.resource: Res.hard_limit}}
try:
@ -137,16 +141,16 @@ class TestQuotaManager(KBApiTest):
except webtest.app.AppError as admin_exception:
self.assertIn('Admin required', admin_exception.message)
@mock.patch.object(rpc, 'get_client')
def test_delete_all_nonadmin(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
def test_delete_all_nonadmin(self):
try:
self.app.delete_json('/v1.0/fake_tenant_id/os-quota-sets/tenant_1',
headers={'X-Tenant-Id': 'fake_tenant'})
except webtest.app.AppError as admin_exception:
self.assertIn('Admin required', admin_exception.message)
@mock.patch.object(rpc, 'get_client')
def test_delete_nonadmin(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
def test_delete_nonadmin(self):
Res = Result('tenant_1', 'cores', 10)
data = {"quota_set": {Res.resource: Res.hard_limit}}
try:
@ -156,8 +160,8 @@ class TestQuotaManager(KBApiTest):
except webtest.app.AppError as admin_exception:
self.assertIn('Admin required', admin_exception.message)
@mock.patch.object(rpc, 'get_client')
def test_quota_sync_nonadmin(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
def test_quota_sync_nonadmin(self):
try:
self.app.put_json(
'/v1.0/fake_tenant_id/os-quota-sets/tenant_1/sync',
@ -165,7 +169,7 @@ class TestQuotaManager(KBApiTest):
except webtest.app.AppError as admin_exception:
self.assertIn('Admin required', admin_exception.message)
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_get_all_nonadmin(self, mock_db_api):
Res = Result('tenant_1', 'ram', 100)
@ -179,9 +183,9 @@ class TestQuotaManager(KBApiTest):
self.assertEqual({'quota_set': {'project_id': 'tenant_1', 'ram': 100}},
eval(response.text))
@mock.patch.object(rpc, 'get_client')
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_get_default_nonadmin(self, mock_db_api, mock_client):
def test_get_default_nonadmin(self, mock_db_api):
mock_db_api.quota_class_get_default.return_value = \
{'class_name': 'default'}
response = self.app.get(
@ -194,8 +198,8 @@ class TestQuotaManager(KBApiTest):
cfg.CONF.kingbird_global_limit['quota_' + resource],
result['quota_set'][resource])
@mock.patch.object(rpc, 'get_client')
def test_quota_sync_bad_request(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
def test_quota_sync_bad_request(self):
try:
self.app.post_json(
'/v1.0/fake_tenant_id/os-quota-sets/tenant_1/sync',
@ -205,9 +209,9 @@ class TestQuotaManager(KBApiTest):
self.assertIn('Bad response: 404 Not Found',
bad_method_exception.message)
@mock.patch.object(rpc, 'get_client')
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_put_invalid_payload(self, mock_db_api, mock_client):
def test_put_invalid_payload(self, mock_db_api):
Res = Result('tenant_1', 'cores', 10)
mock_db_api.quota_update.return_value = Res
data = {'quota': {Res.resource: Res.hard_limit}}
@ -220,9 +224,9 @@ class TestQuotaManager(KBApiTest):
self.assertIn('400 Bad Request',
invalid_payload_exception.message)
@mock.patch.object(rpc, 'get_client')
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_put_invalid_input(self, mock_db_api, mock_client):
def test_put_invalid_input(self, mock_db_api):
Res = Result('tenant_1', 'cores', -10)
mock_db_api.quota_update.return_value = Res
data = {"quota_set": {Res.resource: Res.hard_limit}}
@ -235,9 +239,9 @@ class TestQuotaManager(KBApiTest):
self.assertIn('400 Bad Request',
invalid_input_exception.message)
@mock.patch.object(rpc, 'get_client')
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
@mock.patch.object(quota_manager, 'db_api')
def test_delete_invalid_quota(self, mock_db_api, mock_client):
def test_delete_invalid_quota(self, mock_db_api):
Res = Result('tenant_1', 'invalid_quota', 10)
mock_db_api.quota_destroy.return_value = Res
data = {"quota_set": [Res.resource]}
@ -250,15 +254,19 @@ class TestQuotaManager(KBApiTest):
self.assertIn('The resource could not be found',
invalid_quota_exception.message)
@mock.patch.object(rpc, 'get_client')
def test_get_usages_nonadmin(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient')
def test_get_usages_nonadmin(self, mock_rpc_client):
expected_usage = {"ram": 10}
mock_rpc_client().get_total_usage_for_tenant.return_value = \
expected_usage
response = self.app.get(
'/v1.0/fake_tenant_id/os-quota-sets/tenant_1/detail',
headers={'X_TENANT_ID': 'fake_tenant', 'X_USER_ID': 'nonadmin'})
self.assertEqual(response.status_int, 200)
self.assertEqual(eval(response.body), {"quota_set": expected_usage})
@mock.patch.object(rpc, 'get_client')
def test_quota_sync_bad_action(self, mock_client):
@mock.patch.object(rpc_client, 'EngineClient', new=mock.Mock())
def test_quota_sync_bad_action(self):
try:
self.app.put_json(
'/v1.0/fake_tenant_id/os-quota-sets/tenant_1/syncing',

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import pecan
from pecan.configuration import set_config
from pecan.testing import load_test_app
@ -26,7 +24,6 @@ from oslo_utils import uuidutils
from kingbird.api import api_config
from kingbird.common import config
from kingbird.common import rpc
from kingbird.tests import base
config.register_options()
@ -108,7 +105,6 @@ class TestRootController(KBApiTest):
class TestV1Controller(KBApiTest):
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
def test_get(self):
response = self.app.get('/v1.0')
self.assertEqual(response.status_int, 200)
@ -129,23 +125,18 @@ class TestV1Controller(KBApiTest):
response = api_method('/v1.0', expect_errors=True)
self.assertEqual(response.status_int, 405)
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
def test_post(self):
self._test_method_returns_405('post')
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
def test_put(self):
self._test_method_returns_405('put')
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
def test_patch(self):
self._test_method_returns_405('patch')
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
def test_delete(self):
self._test_method_returns_405('delete')
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
def test_head(self):
self._test_method_returns_405('head')
@ -161,7 +152,6 @@ class TestErrors(KBApiTest):
response = self.app.get('/assert_called_once', expect_errors=True)
self.assertEqual(response.status_int, 404)
@mock.patch.object(rpc, 'get_client', new=mock.Mock())
def test_bad_method(self):
response = self.app.patch('/v1.0/fake_tenant_id/bad_method',
expect_errors=True)

View File

@ -1,52 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import uuid
from kingbird.engine import listener
from kingbird.tests import base
from kingbird.tests import utils
FAKE_PROJECT = 'fake_project'
FAKE_ENGINE_ID = str(uuid.uuid4())
class TestEngineManager(base.KingbirdTestCase):
def setUp(self):
super(TestEngineManager, self).setUp()
self.context = utils.dummy_context()
@mock.patch.object(listener, 'QuotaManager')
def test_init(self, mock_qm):
engine_manager = listener.EngineManager()
self.assertIsNotNone(engine_manager)
self.assertEqual(engine_manager.qm, mock_qm())
@mock.patch.object(listener, 'QuotaManager')
def test_periodic_balance_all(self, mock_qm):
engine_manager = listener.EngineManager()
engine_manager.periodic_balance_all(FAKE_ENGINE_ID)
mock_qm().periodic_balance_all.assert_called_once_with(FAKE_ENGINE_ID)
@mock.patch.object(listener, 'QuotaManager')
def test_quota_sync_for_project(self, mock_qm):
engine_manager = listener.EngineManager()
engine_manager.quota_sync_for_project(self.context, FAKE_PROJECT)
mock_qm().quota_sync_for_project.assert_called_once_with(FAKE_PROJECT)
@mock.patch.object(listener, 'QuotaManager')
def test_get_total_usage_for_tenant(self, mock_qm):
engine_manager = listener.EngineManager()
engine_manager.get_total_usage_for_tenant(self.context, FAKE_PROJECT)
mock_qm().get_total_usage_for_tenant.assert_called_once_with(
FAKE_PROJECT)

View File

@ -12,9 +12,10 @@
import mock
from kingbird.engine import listener
from kingbird.engine import scheduler
from kingbird.engine import service
from kingbird.tests import base
from kingbird.tests import utils
from oslo_config import cfg
CONF = cfg.CONF
@ -23,34 +24,74 @@ CONF = cfg.CONF
class TestEngineService(base.KingbirdTestCase):
def setUp(self):
super(TestEngineService, self).setUp()
self.tenant_id = 'fake_admin'
self.thm = scheduler.ThreadGroupManager()
self.context = utils.dummy_context(user='test_user',
tenant=self.tenant_id)
self.service_obj = service.EngineService('kingbird',
'kingbird-engine')
@mock.patch.object(listener, 'QuotaManager')
def test_init(self, mock_qm):
manager = listener.EngineManager()
engine_service = service.EngineService('127.0.0.1', 'engine',
'topic-A', manager)
self.assertIsNotNone(engine_service)
def test_init(self):
self.assertEqual(self.service_obj.host, 'localhost')
self.assertEqual(self.service_obj.topic, 'kingbird-engine')
self.assertEqual(self.service_obj.periodic_enable,
CONF.scheduler.periodic_enable)
self.assertEqual(self.service_obj.periodic_interval,
CONF.scheduler.periodic_interval)
def test_init_tgm(self):
self.service_obj.init_tgm()
self.assertIsNotNone(self.service_obj.TG)
@mock.patch.object(service, 'EngineService')
@mock.patch.object(listener, 'QuotaManager')
def test_create_service(mock_qm, mock_engine):
service.create_service()
mock_engine().start.assert_called_once_with()
@mock.patch.object(service, 'QuotaManager')
def test_init_qm(self, mock_quota_manager):
self.service_obj.init_qm()
self.assertIsNotNone(self.service_obj.qm)
@mock.patch.object(service, 'QuotaManager')
@mock.patch.object(service, 'rpc_messaging')
def test_start(self, mock_rpc, mock_quota_manager):
self.service_obj.start()
mock_rpc.get_rpc_server.assert_called_once_with(
self.service_obj.target, self.service_obj)
mock_rpc.get_rpc_server().start.assert_called_once_with()
@mock.patch.object(listener, 'QuotaManager')
@mock.patch.object(service, 'EngineService')
@mock.patch.object(service, 'srv')
def test_serve(mock_srv, mock_engine, mock_qm):
manager = listener.EngineManager()
engine_service = service.EngineService('127.0.0.1', 'engine',
'topic-A', manager)
service.serve(engine_service, 2)
mock_srv.launch.assert_called_once_with(CONF, engine_service, workers=2)
@mock.patch.object(service, 'QuotaManager')
def test_periodic_balance_all(self, mock_quota_manager):
self.service_obj.init_tgm()
self.service_obj.init_qm()
self.service_obj.periodic_balance_all(self.service_obj.engine_id)
mock_quota_manager().periodic_balance_all.\
assert_called_once_with(self.service_obj.engine_id)
@mock.patch.object(service, 'QuotaManager')
def test_get_total_usage_for_tenant(self, mock_quota_manager):
self.service_obj.init_tgm()
self.service_obj.init_qm()
self.service_obj.get_total_usage_for_tenant(
self.context, self.tenant_id)
mock_quota_manager().get_total_usage_for_tenant.\
assert_called_once_with(self.tenant_id)
@mock.patch.object(service, '_launcher')
def test_wait(mock_launcher):
service.wait()
mock_launcher.wait.assert_called_once_with()
@mock.patch.object(service, 'QuotaManager')
def test_quota_sync_for_project(self, mock_quota_manager):
self.service_obj.init_tgm()
self.service_obj.init_qm()
self.service_obj.quota_sync_for_project(
self.context, self.tenant_id)
mock_quota_manager().quota_sync_for_project.\
assert_called_once_with(self.tenant_id)
@mock.patch.object(service, 'QuotaManager')
@mock.patch.object(service, 'rpc_messaging')
def test_stop_rpc_server(self, mock_rpc, mock_quota_manager):
self.service_obj.start()
self.service_obj._stop_rpc_server()
mock_rpc.get_rpc_server().stop.assert_called_once_with()
@mock.patch.object(service, 'QuotaManager')
@mock.patch.object(service, 'rpc_messaging')
def test_stop(self, mock_rpc, mock_quota_manager):
self.service_obj.start()
self.service_obj.stop()
mock_rpc.get_rpc_server().stop.assert_called_once_with()

View File

View File

@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kingbird.common import config
from kingbird.common import messaging
from kingbird.rpc import client as rpc_client
from kingbird.tests import base
from kingbird.tests import utils
config.register_options()
class EngineRpcAPITestCase(base.KingbirdTestCase):
def setUp(self):
messaging.setup("fake://", optional=True)
self.addCleanup(messaging.cleanup)
self.context = utils.dummy_context()
# self.stubs = stubout.StubOutForTesting()
self.rpcapi = rpc_client.EngineClient()
super(EngineRpcAPITestCase, self).setUp()
@mock.patch.object(messaging, 'get_rpc_client')
def test_call(self, mock_client):
client = mock.Mock()
mock_client.return_value = client
method = 'fake_method'
kwargs = {'key': 'value'}
rpcapi = rpc_client.EngineClient()
msg = rpcapi.make_msg(method, **kwargs)
# with no version
res = rpcapi.call(self.context, msg)
self.assertEqual(client, rpcapi._client)
client.call.assert_called_once_with(self.context, 'fake_method',
key='value')
self.assertEqual(res, client.call.return_value)
# with version
res = rpcapi.call(self.context, msg, version='123')
client.prepare.assert_called_once_with(version='123')
new_client = client.prepare.return_value
new_client.call.assert_called_once_with(self.context, 'fake_method',
key='value')
self.assertEqual(res, new_client.call.return_value)
@mock.patch.object(messaging, 'get_rpc_client')
def test_cast(self, mock_client):
client = mock.Mock()
mock_client.return_value = client
method = 'fake_method'
kwargs = {'key': 'value'}
rpcapi = rpc_client.EngineClient()
msg = rpcapi.make_msg(method, **kwargs)
# with no version
res = rpcapi.cast(self.context, msg)
self.assertEqual(client, rpcapi._client)
client.cast.assert_called_once_with(self.context, 'fake_method',
key='value')
self.assertEqual(res, client.cast.return_value)
# with version
res = rpcapi.cast(self.context, msg, version='123')
client.prepare.assert_called_once_with(version='123')
new_client = client.prepare.return_value
new_client.cast.assert_called_once_with(self.context, 'fake_method',
key='value')
self.assertEqual(res, new_client.cast.return_value)

View File

@ -82,7 +82,7 @@ def create_quota_limit(ctxt, **kwargs):
def dummy_context(user='test_username', tenant='test_project_id',
region_name=None):
return context.ContextBase.from_dict({
return context.RequestContext.from_dict({
'auth_token': 'abcd1234',
'user': user,
'tenant': tenant,