Restructure kingbird architecture

Remove Job Deamon & Job Worker, add Engine instead.
Remove HelloWorld example and add QuotaManager controller.
Added QuotaManager in engine.
Modified UTs according to the new design.
Added new UTs.

Change-Id: I13d11cc696255e9e59dede314ca1e2a505933c1f
This commit is contained in:
Ashish Singh 2016-02-08 18:55:34 +05:30
parent 73534bdd6f
commit b0c110d2f3
31 changed files with 431 additions and 1158 deletions

View File

@ -32,13 +32,13 @@ LOG = logging.getLogger(__name__)
from kingbird.common.i18n import _LI
from kingbird.common.i18n import _LW
from kingbird.jobdaemon import jdcfg
import kingbird.jobdaemon.jdservice as service
from kingbird.engine import engine_cfg
from kingbird.engine import service
def main():
jdcfg.init(sys.argv[1:])
jdcfg.setup_logging()
engine_cfg.init(sys.argv[1:])
engine_cfg.setup_logging()
host = CONF.host
workers = CONF.workers
@ -50,8 +50,8 @@ def main():
LOG.info(_LI("Server on http://%(host)s with %(workers)s"),
{'host': host, 'workers': workers})
jdservice = service.create_service()
service.serve(jdservice, workers)
engine_service = service.create_service()
service.serve(engine_service, workers)
service.wait()
LOG.info(_LI("Configuration:"))

View File

@ -1,62 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
if __name__ == "__main__":
eventlet.monkey_patch()
import sys
from oslo_config import cfg
from oslo_log import log as logging
import logging as std_logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
from kingbird.common.i18n import _LI
from kingbird.common.i18n import _LW
from kingbird.jobworker import jwcfg
import kingbird.jobworker.jwservice as service
def main():
jwcfg.init(sys.argv[1:])
jwcfg.setup_logging()
host = CONF.host
workers = CONF.workers
if workers < 1:
LOG.warning(_LW("Wrong worker number, worker = %(workers)s"), workers)
workers = 1
LOG.info(_LI("Server on http://%(host)s with %(workers)s"),
{'host': host, 'workers': workers})
jwservice = service.create_service()
service.serve(jwservice, workers)
service.wait()
LOG.info(_LI("Configuration:"))
CONF.log_opt_values(LOG, std_logging.INFO)
if __name__ == '__main__':
main()

View File

@ -67,33 +67,20 @@ function configure_kingbird_api {
}
function configure_kingbird_JD {
echo "Configuring kingbird jobdaemon service"
function configure_kingbird_engine {
echo "Configuring kingbird engine service"
if is_service_enabled kb-jd ; then
cp -p $KINGBIRD_DIR/etc/jobworker.conf $KINGBIRD_JD_CONF
iniset $KINGBIRD_JD_CONF DEFAULT debug $ENABLE_DEBUG_LOG_LEVEL
iniset $KINGBIRD_JD_CONF DEFAULT verbose True
iniset $KINGBIRD_JD_CONF DEFAULT use_syslog $SYSLOG
if is_service_enabled kb-engine ; then
cp -p $KINGBIRD_DIR/etc/engine.conf $KINGBIRD_ENGINE_CONF
iniset $KINGBIRD_ENGINE_CONF DEFAULT debug $ENABLE_DEBUG_LOG_LEVEL
iniset $KINGBIRD_ENGINE_CONF DEFAULT verbose True
iniset $KINGBIRD_ENGINE_CONF DEFAULT use_syslog $SYSLOG
setup_colorized_logging $KINGBIRD_JD_CONF DEFAULT
setup_colorized_logging $KINGBIRD_ENIGNE_CONF DEFAULT
fi
}
function configure_kingbird_JW {
echo "Configuring kingbird jobdworker service"
if is_service_enabled kb-jw ; then
cp -p $KINGBIRD_DIR/etc/jobdaemon.conf $KINGBIRD_JW_CONF
iniset $KINGBIRD_JW_CONF DEFAULT debug $ENABLE_DEBUG_LOG_LEVEL
iniset $KINGBIRD_JW_CONF DEFAULT verbose True
iniset $KINGBIRD_JW_CONF DEFAULT use_syslog $SYSLOG
setup_colorized_logging $KINGBIRD_JW_CONF DEFAULT
fi
}
if [[ "$Q_ENABLE_KINGBIRD" == "True" ]]; then
if [[ "$1" == "stack" && "$2" == "pre-install" ]]; then
echo summary "Kingbird pre-install"
@ -109,8 +96,7 @@ if [[ "$Q_ENABLE_KINGBIRD" == "True" ]]; then
sudo install -d -o $STACK_USER -m 755 $KINGBIRD_CONF_DIR
configure_kingbird_api
configure_kingbird_JD
configure_kingbird_JW
configure_kingbird_engine
echo export PYTHONPATH=\$PYTHONPATH:$KINGBIRD_DIR >> $RC_DIR/.localrc.auto
@ -120,12 +106,8 @@ if [[ "$Q_ENABLE_KINGBIRD" == "True" ]]; then
elif [[ "$1" == "stack" && "$2" == "extra" ]]; then
echo_summary "Initializing Kingbird Service"
if is_service_enabled kb-jw; then
run_process kb-jw "python $KINGBIRD_JW_SERVICE --config-file=$KINGBIRD_JW_CONF"
fi
if is_service_enabled kb-jd; then
run_process kb-jd "python $KINGBIRD_JD_SERVICE --config-file=$KINGBIRD_JD_CONF"
if is_service_enabled kb-engine; then
run_process kb-engine "python $KINGBIRD_ENGINE --config-file=$KINGBIRD_ENGINE_CONF"
fi
if is_service_enabled kb-api; then
@ -138,12 +120,8 @@ if [[ "$Q_ENABLE_KINGBIRD" == "True" ]]; then
if [[ "$1" == "unstack" ]]; then
if is_service_enabled kb-jw; then
stop_process kb-jw
fi
if is_service_enabled kb-jd; then
stop_process kb-jd
if is_service_enabled kb-engine; then
stop_process kb-engine
fi
if is_service_enabled kb-api; then

View File

@ -24,8 +24,8 @@
# publish_errors = False
# Address to bind the API server to
host = kingbird.jobdaemon_host
# Address to bind the ENGINE server to
host = kingbird.engine_host
# The messaging driver to use, defaults to rabbit. Other
# drivers include qpid and zmq. (string value)

View File

@ -1,339 +0,0 @@
[DEFAULT]
# Print more verbose output (set logging level to INFO instead of default WARNING level).
# verbose = True
# Print debugging output (set logging level to DEBUG instead of default WARNING level).
# debug = True
# log_format = %(asctime)s %(levelname)8s [%(name)s] %(message)s
# log_date_format = %Y-%m-%d %H:%M:%S
# use_syslog -> syslog
# log_file and log_dir -> log_dir/log_file
# (not log_file) and log_dir -> log_dir/{binary_name}.log
# use_stderr -> stderr
# (not user_stderr) and (not log_file) -> stdout
# publish_errors -> notification system
# use_syslog = False
# syslog_log_facility = LOG_USER
# use_stderr = True
# log_file =
# log_dir =
# publish_errors = False
# Address to bind the API server to
host = kingbird.jobworker_host
# The messaging driver to use, defaults to rabbit. Other
# drivers include qpid and zmq. (string value)
rpc_backend=rabbit
# The default exchange under which topics are scoped. May be
# overridden by an exchange name specified in the
# transport_url option. (string value)
# control_exchange=openstack
[database]
# This line MUST be changed to actually run the plugin.
# Example:
# connection = mysql+pymysql://root:pass@127.0.0.1:3306/neutron
# Replace 127.0.0.1 above with the IP address of the database used by the
# main neutron server. (Leave it as is if the database runs on this host.)
# connection = sqlite://
# NOTE: In deployment the [database] section and its connection attribute may
# be set in the corresponding core plugin '.ini' file. However, it is suggested
# to put the [database] section and its connection attribute in this
# configuration file.
# Database engine for which script will be generated when using offline
# migration
# engine =
# The SQLAlchemy connection string used to connect to the slave database
# slave_connection =
# Database reconnection retry times - in event connectivity is lost
# set to -1 implies an infinite retry count
# max_retries = 10
# Database reconnection interval in seconds - if the initial connection to the
# database fails
# retry_interval = 10
# Minimum number of SQL connections to keep open in a pool
# min_pool_size = 1
# Maximum number of SQL connections to keep open in a pool
# max_pool_size = 10
# Timeout in seconds before idle sql connections are reaped
# idle_timeout = 3600
# If set, use this value for max_overflow with sqlalchemy
# max_overflow = 20
# Verbosity of SQL debugging information. 0=None, 100=Everything
# connection_debug = 0
# Add python stack traces to SQL as comment strings
# connection_trace = False
# If set, use this value for pool_timeout with sqlalchemy
# pool_timeout = 10
[client]
# Keystone authentication URL
# auth_url = http://127.0.0.1:5000/v3
# Keystone service URL
# identity_url = http://127.0.0.1:35357/v3
# If set to True, endpoint will be automatically refreshed if timeout
# accessing endpoint.
# auto_refresh_endpoint = False
# Name of top site which client needs to access
# top_site_name =
# Username of admin account for synchronizing endpoint with Keystone
# admin_username =
# Password of admin account for synchronizing endpoint with Keystone
# admin_password =
# Tenant name of admin account for synchronizing endpoint with Keystone
# admin_tenant =
# User domain name of admin account for synchronizing endpoint with Keystone
# admin_user_domain_name = default
# Tenant domain name of admin account for synchronizing endpoint with Keystone
# admin_tenant_domain_name = default
[oslo_concurrency]
# Directory to use for lock files. For security, the specified directory should
# only be writable by the user running the processes that need locking.
# Defaults to environment variable OSLO_LOCK_PATH. If external locks are used,
# a lock path must be set.
lock_path = $state_path/lock
# Enables or disables inter-process locks.
# disable_process_locking = False
[oslo_messaging_amqp]
#
# From oslo.messaging
#
# Address prefix used when sending to a specific server (string value)
# Deprecated group/name - [amqp1]/server_request_prefix
# server_request_prefix = exclusive
# Address prefix used when broadcasting to all servers (string value)
# Deprecated group/name - [amqp1]/broadcast_prefix
# broadcast_prefix = broadcast
# Address prefix when sending to any server in group (string value)
# Deprecated group/name - [amqp1]/group_request_prefix
# group_request_prefix = unicast
# Name for the AMQP container (string value)
# Deprecated group/name - [amqp1]/container_name
# container_name =
# Timeout for inactive connections (in seconds) (integer value)
# Deprecated group/name - [amqp1]/idle_timeout
# idle_timeout = 0
# Debug: dump AMQP frames to stdout (boolean value)
# Deprecated group/name - [amqp1]/trace
# trace = false
# CA certificate PEM file for verifing server certificate (string value)
# Deprecated group/name - [amqp1]/ssl_ca_file
# ssl_ca_file =
# Identifying certificate PEM file to present to clients (string value)
# Deprecated group/name - [amqp1]/ssl_cert_file
# ssl_cert_file =
# Private key PEM file used to sign cert_file certificate (string value)
# Deprecated group/name - [amqp1]/ssl_key_file
# ssl_key_file =
# Password for decrypting ssl_key_file (if encrypted) (string value)
# Deprecated group/name - [amqp1]/ssl_key_password
# ssl_key_password =
# Accept clients using either SSL or plain TCP (boolean value)
# Deprecated group/name - [amqp1]/allow_insecure_clients
# allow_insecure_clients = false
[oslo_messaging_qpid]
#
# From oslo.messaging
#
# Use durable queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
# amqp_durable_queues = false
# Auto-delete queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/amqp_auto_delete
# amqp_auto_delete = false
# Size of RPC connection pool. (integer value)
# Deprecated group/name - [DEFAULT]/rpc_conn_pool_size
# rpc_conn_pool_size = 30
# Qpid broker hostname. (string value)
# Deprecated group/name - [DEFAULT]/qpid_hostname
# qpid_hostname = localhost
# Qpid broker port. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_port
# qpid_port = 5672
# Qpid HA cluster host:port pairs. (list value)
# Deprecated group/name - [DEFAULT]/qpid_hosts
# qpid_hosts = $qpid_hostname:$qpid_port
# Username for Qpid connection. (string value)
# Deprecated group/name - [DEFAULT]/qpid_username
# qpid_username =
# Password for Qpid connection. (string value)
# Deprecated group/name - [DEFAULT]/qpid_password
# qpid_password =
# Space separated list of SASL mechanisms to use for auth. (string value)
# Deprecated group/name - [DEFAULT]/qpid_sasl_mechanisms
# qpid_sasl_mechanisms =
# Seconds between connection keepalive heartbeats. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_heartbeat
# qpid_heartbeat = 60
# Transport to use, either 'tcp' or 'ssl'. (string value)
# Deprecated group/name - [DEFAULT]/qpid_protocol
# qpid_protocol = tcp
# Whether to disable the Nagle algorithm. (boolean value)
# Deprecated group/name - [DEFAULT]/qpid_tcp_nodelay
# qpid_tcp_nodelay = true
# The number of prefetched messages held by receiver. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_receiver_capacity
# qpid_receiver_capacity = 1
# The qpid topology version to use. Version 1 is what was originally used by
# impl_qpid. Version 2 includes some backwards-incompatible changes that allow
# broker federation to work. Users should update to version 2 when they are
# able to take everything down, as it requires a clean break. (integer value)
# Deprecated group/name - [DEFAULT]/qpid_topology_version
# qpid_topology_version = 1
[oslo_messaging_rabbit]
#
# From oslo.messaging
#
# Use durable queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
# amqp_durable_queues = false
# Auto-delete queues in AMQP. (boolean value)
# Deprecated group/name - [DEFAULT]/amqp_auto_delete
# amqp_auto_delete = false
# Size of RPC connection pool. (integer value)
# Deprecated group/name - [DEFAULT]/rpc_conn_pool_size
# rpc_conn_pool_size = 30
# SSL version to use (valid only if SSL enabled). Valid values are TLSv1 and
# SSLv23. SSLv2, SSLv3, TLSv1_1, and TLSv1_2 may be available on some
# distributions. (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_version
# kombu_ssl_version =
# SSL key file (valid only if SSL enabled). (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_keyfile
# kombu_ssl_keyfile =
# SSL cert file (valid only if SSL enabled). (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_certfile
# kombu_ssl_certfile =
# SSL certification authority file (valid only if SSL enabled). (string value)
# Deprecated group/name - [DEFAULT]/kombu_ssl_ca_certs
# kombu_ssl_ca_certs =
# How long to wait before reconnecting in response to an AMQP consumer cancel
# notification. (floating point value)
# Deprecated group/name - [DEFAULT]/kombu_reconnect_delay
# kombu_reconnect_delay = 1.0
# The RabbitMQ broker address where a single node is used. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_host
rabbit_host = localhost
# The RabbitMQ broker port where a single node is used. (integer value)
# Deprecated group/name - [DEFAULT]/rabbit_port
rabbit_port = 5672
# RabbitMQ HA cluster host:port pairs. (list value)
# Deprecated group/name - [DEFAULT]/rabbit_hosts
rabbit_hosts = $rabbit_host:$rabbit_port
# Connect over SSL for RabbitMQ. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_use_ssl
# rabbit_use_ssl = false
# The RabbitMQ userid. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_userid
rabbit_userid = guest
# The RabbitMQ password. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_password
rabbit_password = guest
# The RabbitMQ login method. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_login_method
# rabbit_login_method = AMQPLAIN
# The RabbitMQ virtual host. (string value)
# Deprecated group/name - [DEFAULT]/rabbit_virtual_host
# rabbit_virtual_host = /
# How frequently to retry connecting with RabbitMQ. (integer value)
# rabbit_retry_interval = 1
# How long to backoff for between retries when connecting to RabbitMQ. (integer
# value)
# Deprecated group/name - [DEFAULT]/rabbit_retry_backoff
# rabbit_retry_backoff = 2
# Maximum number of RabbitMQ connection retries. Default is 0 (infinite retry
# count). (integer value)
# Deprecated group/name - [DEFAULT]/rabbit_max_retries
# rabbit_max_retries = 0
# Use HA queues in RabbitMQ (x-ha-policy: all). If you change this option, you
# must wipe the RabbitMQ database. (boolean value)
# Deprecated group/name - [DEFAULT]/rabbit_ha_queues
# rabbit_ha_queues = false
# Deprecated, use rpc_backend=kombu+memory or rpc_backend=fake (boolean value)
# Deprecated group/name - [DEFAULT]/fake_rabbit
# fake_rabbit = false

View File

@ -5,12 +5,11 @@ api
Kingbird API is Web Server Gateway Interface (WSGI) applications to receive
and process API calls, including keystonemiddleware to do the authentication,
parameter check and validation, convert API calls to job rpc message, and
then send the job to Kingbird Job Daemon through the queue. If the job will
be processed by Kingbird Job Daemon in synchronous way, the Kingbird API will
wait for the response from the Kingbird Job Daemon. Otherwise, the Kingbird
then send the job to Kingbird Engine through the queue. If the job will
be processed by Kingbird Engine in synchronous way, the Kingbird API will
wait for the response from the Kingbird Engine. Otherwise, the Kingbird
API will send response to the API caller first, and then send the job to
Kingbird Job Daemon in asynchronous way. One of the Kingbird Job Daemons
will be the owner of the job.
Kingbird Engine in asynchronous way.
Multiple Kingbird API could run in parallel, and also can work in multi-worker
mode.

View File

@ -7,8 +7,8 @@ API request processing
root.py:
API root request
helloworld.py:
sample for adding a new resource/controller for http request processing
quota_manager.py
Controller for all the quota related request
restcomm.py:
common functionality used in API

View File

@ -1,77 +0,0 @@
# Copyright (c) 2015 Huawei Tech. Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pecan
from pecan import expose
from pecan import request
from pecan import rest
import restcomm
from kingbird.jobdaemon import jdrpcapi
class HelloWorldController(rest.RestController):
def __init__(self, *args, **kwargs):
super(HelloWorldController, self).__init__(*args, **kwargs)
self.jd_api = jdrpcapi.JobDaemonAPI()
@expose(generic=True, template='json')
def index(self):
if pecan.request.method != 'GET':
pecan.abort(405)
context = restcomm.extract_context_from_environ()
if context.is_admin:
return {'hello world message for admin': 'GET'}
else:
return {'hello world message for non-admin': 'GET'}
@index.when(method='PUT', template='json')
def put(self):
context = restcomm.extract_context_from_environ()
payload = '## put call ##, request.body = '
payload = payload + request.body
return self.jd_api.say_hello_world_call(context, payload)
@index.when(method='POST', template='json')
def post(self):
context = restcomm.extract_context_from_environ()
payload = '## post call ##, request.body = '
payload = payload + request.body
return self.jd_api.say_hello_world_call(context, payload)
@index.when(method='delete', template='json')
def delete(self):
# no return value to browser indeed for cast. check the log info in
# jdmanager, jwmanager instead
context = restcomm.extract_context_from_environ()
payload = '## delete cast ##, request.body is null'
payload = payload + request.body
self.jd_api.say_hello_world_cast(context, payload)
return self._delete_response(context)
def _delete_response(self, context):
context = context
return {'cast example': 'check the log produced by jobdaemon '
+ 'and jobworker, no value returned here'}

View File

@ -0,0 +1,120 @@
# Copyright (c) 2016 Ericsson AB
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
import pecan
from pecan import expose
from pecan import request
from pecan import rest
import restcomm
from kingbird.common import rpc
from kingbird.common.serializer import KingbirdSerializer as Serializer
from kingbird.common import topics
CONF = cfg.CONF
rpcapi_cap_opt = cfg.StrOpt('kb-engine',
help='Set a version cap for messages sent to'
'kb-engine services. If you plan to do a'
'live upgrade from an old version to a'
'newer version, you should set this option'
'to the old version before beginning the'
'live upgrade procedure. Only upgrading'
'to the next version is supported, so you'
'cannot skip a release for the live upgrade'
'procedure.')
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
LOG = logging.getLogger(__name__)
class QuotaManagerController(rest.RestController):
VERSION_ALIASES = {
'mitaka': '1.0',
}
def __init__(self, *args, **kwargs):
super(QuotaManagerController, self).__init__(*args, **kwargs)
target = messaging.Target(topic=topics.TOPIC_KB_ENGINE, version='1.0')
upgrade_level = CONF.upgrade_levels.kb_engine
version_cap = 1.0
if upgrade_level == 'auto':
version_cap = self._determine_version_cap(target)
else:
version_cap = self.VERSION_ALIASES.get(upgrade_level,
upgrade_level)
serializer = Serializer()
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
# to do the version compatibility for future purpose
def _determine_version_cap(self, target):
version_cap = 1.0
return version_cap
@expose(generic=True, template='json')
def index(self, arg=None):
if pecan.request.method != 'GET':
pecan.abort(405)
context = restcomm.extract_context_from_environ()
if context.is_admin:
return {'Admin call for index with project': arg}
else:
return {'Non admin call for index with project': arg}
@index.when(method='PUT', template='json')
def put(self):
context = restcomm.extract_context_from_environ()
payload = '## put call ##, request.body = '
payload = payload + request.body
# To illustrate RPC, below line is written. Will be replaced by
# DB API call for updating quota limits for a tenant
return self.client.call(context, 'say_hello_world_call',
payload=payload)
@index.when(method='POST', template='json')
def post(self):
context = restcomm.extract_context_from_environ()
payload = '## post call ##, request.body = '
payload = payload + request.body
# To illustrate RPC, below line is written. Will be replaced by
# DB API call for creating quota limits for a tenant
return self.client.call(context, 'say_hello_world_call',
payload=payload)
@index.when(method='delete', template='json')
def delete(self):
context = restcomm.extract_context_from_environ()
payload = '## delete cast ##, request.body is null'
payload = payload + request.body
# To illustrate RPC, below line is written. Will be replaced by
# DB API call for deleting quota limits for a tenant
self.client.cast(context, 'say_hello_world_cast', payload=payload)
return self._delete_response(context)
def _delete_response(self, context):
context = context
return {'cast example': 'check the log produced by engine'
+ 'and no value returned here'}

View File

@ -16,7 +16,7 @@
import pecan
from kingbird.api.controllers import helloworld
from kingbird.api.controllers import quota_manager
class RootController(object):
@ -58,7 +58,7 @@ class V1Controller(object):
def __init__(self):
self.sub_controllers = {
"helloworld": helloworld.HelloWorldController()
"quota": quota_manager.QuotaManagerController()
}
for name, ctrl in self.sub_controllers.items():

View File

@ -48,6 +48,16 @@ from kingbird.db import base
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
host_opts = [
cfg.StrOpt('host',
default='localhost',
help='hostname of the machine')
]
host_opt_group = cfg.OptGroup('host_details')
cfg.CONF.register_group(host_opt_group)
cfg.CONF.register_opts(host_opts, group=host_opt_group)
class PeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self):
@ -58,7 +68,7 @@ class Manager(base.Base, PeriodicTasks):
def __init__(self, host=None, db_driver=None, service_name='undefined'):
if not host:
host = CONF.host
host = cfg.CONF.host_details.host
self.host = host
self.service_name = service_name
# self.notifier = rpc.get_notifier(self.service_name, self.host)

View File

@ -17,5 +17,4 @@ CREATE = 'create'
DELETE = 'delete'
UPDATE = 'update'
TOPIC_JOBDAEMON = 'job_daemon'
TOPIC_JOBWORKER = 'job_worker'
TOPIC_KB_ENGINE = 'engine'

30
kingbird/engine/README.rst Executable file
View File

@ -0,0 +1,30 @@
===============================
Service
===============================
Kingbird Service has responsibility for:
Delegate the task to concerned engine component managed by a EngineManager
in listener.py
Monitoring the job/smaller jobs status, and return the result to Kingbird
API if needed.
Generate task to purge time-out jobs from Kingbird Database
Multiple Kingbird API could run in parallel, and also can work in
multi-worker mode.
Multiple Kingbird Engine will be designed and run in stateless mode,
persistent data will be accessed (read and write) from the Kingbird
Database through the DAL module.
service.py:
run KB service in multi-worker mode, and establish RPC server
listener.py
Manages all engine side activities such as Quota Enforcement,
synchronisation of ssh keys, images, flavors, security groups,
etc. across regions
engine_cfg.py:
configuration and initialization for Engine service

View File

View File

@ -1,4 +1,4 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# Copyright 2016 Ericsson AB
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -22,19 +22,15 @@ import sys
from oslo_config import cfg
from oslo_log import log as logging
from kingbird.common import rpc
from kingbird.common.i18n import _
from kingbird.common.i18n import _LI
# from kingbird import policy
from kingbird.common import rpc
from kingbird.common import version
LOG = logging.getLogger(__name__)
common_opts = [
cfg.StrOpt('host', default='kingbird.jwhost',
cfg.StrOpt('host', default='kingbird.serivcehost',
help=_("The host name for RPC server")),
cfg.IntOpt('workers', default=2,
help=_("number of workers")),
@ -53,7 +49,7 @@ def init(args, **kwargs):
# auth.register_conf_options(cfg.CONF)
logging.register_options(cfg.CONF)
cfg.CONF(args=args, project='kingbird.jobworker',
cfg.CONF(args=args, project='kingbird.engine',
version='%%(prog)s %s' % version.version_info.release_string(),
**kwargs)
@ -62,7 +58,7 @@ def init(args, **kwargs):
def setup_logging():
"""Sets up the logging options for a log with supplied name."""
product_name = "kingbird.jobworker"
product_name = "kingbird.engine"
logging.setup(cfg.CONF, product_name)
LOG.info(_LI("Logging enabled!"))
LOG.info(_LI("%(prog)s version %(version)s"),

View File

@ -1,4 +1,4 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# Copyright 2016 Ericsson AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -20,61 +20,56 @@ import oslo_messaging as messaging
from kingbird.common.i18n import _
from kingbird.common.i18n import _LI
from kingbird.common import manager
from kingbird.engine.quota_manager import QuotaManager
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class JWManager(manager.Manager):
"""Manages the running job worker from creation to destruction."""
class EngineManager(manager.Manager):
"""Manages all the kb engine activities."""
target = messaging.Target(version='1.0')
def __init__(self, *args, **kwargs):
self.qm = QuotaManager()
LOG.debug(_('Engine initialization...'))
super(JWManager, self).__init__(service_name="job_worker",
super(EngineManager, self).__init__(service_name="engine_manager",
*args, **kwargs)
LOG.debug(_('JWManager initialization...'))
def init_host(self):
LOG.debug(_('JWManager init_host...'))
LOG.debug(_('Engine init_host...'))
pass
def cleanup_host(self):
LOG.debug(_('JWManager cleanup_host...'))
LOG.debug(_('Engine cleanup_host...'))
pass
def pre_start_hook(self):
LOG.debug(_('JWManager pre_start_hook...'))
LOG.debug(_('Engine pre_start_hook...'))
pass
def post_start_hook(self):
LOG.debug(_('JWManager post_start_hook...'))
LOG.debug(_('Engine post_start_hook...'))
pass
# rpc message endpoint handling
def say_hello_world_call(self, ctx, payload):
LOG.info(_LI("jobworker say hello world, call payload: %s"), payload)
LOG.info(_LI("engine say hello world, call payload: %s"), payload)
info_text = "payload: %s" % payload
return {'jobworker': info_text}
return info_text
def say_hello_world_cast(self, ctx, payload):
LOG.info(_LI("jobworker say hello world, cast payload: %s"), payload)
LOG.info(_LI("engine say hello world, cast payload: %s"), payload)
# no return value to browser indeed for cast. check the log info
info_text = "payload: %s" % payload
return {'jobworker': info_text}
return {'engine': info_text}

View File

@ -0,0 +1,40 @@
# Copyright 2016 Ericsson AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from kingbird.common.i18n import _
from kingbird.common.i18n import _LI
from kingbird.common import manager
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class QuotaManager(manager.Manager):
"""Manages tasks related to quota management."""
def __init__(self, *args, **kwargs):
LOG.debug(_('QuotaManager initialization...'))
super(QuotaManager, self).__init__(service_name="quota_manager",
*args, **kwargs)
def periodic_balance_all(self, ctx):
# TODO(Ashish): Implement Quota Syncing
LOG.info(_LI("periodically balance quota for all keystone tenants"))
pass

View File

@ -1,4 +1,4 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# Copyright 2016 Ericsson AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -22,22 +22,31 @@ from kingbird.common.i18n import _
from kingbird.common.serializer import KingbirdSerializer as Serializer
from kingbird.common.service import Service
from kingbird.common import topics
from kingbird.jobworker.jwmanager import JWManager
from kingbird.engine.listener import EngineManager
_TIMER_INTERVAL = 30
_TIMER_INTERVAL_MAX = 60
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
host_opts = [
cfg.StrOpt('host',
default='localhost',
help='hostname of the machine')
]
host_opt_group = cfg.OptGroup('host_details')
cfg.CONF.register_group(host_opt_group)
cfg.CONF.register_opts(host_opts, group=host_opt_group)
class JWService(Service):
class EngineService(Service):
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, serializer=None,
*args, **kwargs):
super(JWService, self).__init__(host, binary, topic, manager,
super(EngineService, self).__init__(host, binary, topic, manager,
report_interval, periodic_enable,
periodic_fuzzy_delay,
periodic_interval_max, serializer,
@ -46,34 +55,34 @@ class JWService(Service):
def create_service():
LOG.debug(_('create job worker server'))
LOG.debug(_('create KB engine service'))
jwmanager = JWManager()
jwservice = JWService(
host=CONF.host,
binary="job_worker",
topic=topics.TOPIC_JOBWORKER,
manager=jwmanager,
engine_manager = EngineManager()
engine_service = EngineService(
host=cfg.CONF.host_details.host,
binary="kb_engine",
topic=topics.TOPIC_KB_ENGINE,
manager=engine_manager,
periodic_enable=True,
report_interval=_TIMER_INTERVAL,
periodic_interval_max=_TIMER_INTERVAL_MAX,
serializer=Serializer()
)
jwservice.start()
engine_service.start()
return jwservice
return engine_service
_launcher = None
def serve(jwservice, workers=1):
def serve(engine_service, workers=1):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = srv.launch(CONF, jwservice, workers=workers)
_launcher = srv.launch(CONF, engine_service, workers=workers)
def wait():

View File

@ -1,46 +0,0 @@
===============================
jobdaemon
===============================
Kingbird Job Daemon has responsibility for:
Divid job from Kingbird API to smaller jobs, each smaller job will only
be involved with one specific region, and one smaller job will be
dispatched to one Kingbird Job Worker. Multiple smaller jobs may be
dispatched to the same Kingbird Job Worker, its up to the load balancing
policy and how many Kingbird Job Workers are running.
Some job from Kingbird API could not be divided, schedule and re-schedule
such kind of (periodically running, like quota enforcement, regular
event statistic collection task) job to a specific Kingbird Job Worker.
If some Kingbird Job Worker failed, re-balance the job to other Kingbird
Job Workers.
Monitoring the job/smaller jobs status, and return the result to Kingbird
API if needed.
Generate task to purge time-out jobs from Kingbird Database
Multiple Job Daemon could run in parallel, and also can work in
multi-worker mode. But for one job from Kingbird API, only one Kingbird
Job Daemon will be the owner. One Kingbird Job Daemon could be the owner
of multiple jobs from multiple Kingbird APIs
Multiple Kingbird Daemon will be designed and run in stateless mode,
persistent data will be accessed (read and write) from the Kingbird
Database through the DAL module.
jdrpcapi.py:
the client side RPC api for JobDaemon. Often the API service will
call the api provided in this file, and the RPC client will send the
request to message-bus, and then the JobDaemon can pickup the RPC message
from the message bus
jdservice.py:
run JobDaemon in multi-worker mode, and establish RPC server
jdmanager.py:
all rpc messages received by the jdservice RPC server will be processed
in the jdmanager's regarding function.
jdcfg.py:
configuration and initialization for JobDaemon

View File

@ -1,81 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Routines for configuring kingbird, largely copy from Neutron
"""
from kingbird.common.i18n import _
from kingbird.common.i18n import _LI
from kingbird.common import rpc
from oslo_config import cfg
from oslo_log import log as logging
import sys
# from kingbird import policy
from kingbird.common import version
LOG = logging.getLogger(__name__)
common_opts = [
cfg.StrOpt('host', default='kingbird.jdhost',
help=_("The host name for RPC server")),
cfg.IntOpt('workers', default=2,
help=_("number of workers")),
cfg.StrOpt('state_path',
default='/var/lib/kingbird',
deprecated_name='pybasedir',
help="Top-level directory for maintaining kingbird's state"),
]
def init(args, **kwargs):
# Register the configuration options
cfg.CONF.register_opts(common_opts)
# ks_session.Session.register_conf_options(cfg.CONF)
# auth.register_conf_options(cfg.CONF)
logging.register_options(cfg.CONF)
cfg.CONF(args=args, project='kingbird.jobdaemon',
version='%%(prog)s %s' % version.version_info.release_string(),
**kwargs)
rpc.init(cfg.CONF)
def setup_logging():
"""Sets up the logging options for a log with supplied name."""
product_name = "kingbird.jobdaemon"
logging.setup(cfg.CONF, product_name)
LOG.info(_LI("Logging enabled!"))
LOG.info(_LI("%(prog)s version %(version)s"),
{'prog': sys.argv[0],
'version': version.version_info.release_string()})
LOG.debug("command line: %s", " ".join(sys.argv))
def reset_service():
# Reset worker in case SIGHUP is called.
# Note that this is called only in case a service is running in
# daemon mode.
setup_logging()
# TODO(joehuang) enforce policy later
# policy.refresh()

View File

@ -1,83 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from kingbird.common.i18n import _
from kingbird.common.i18n import _LI
from kingbird.common import manager
from kingbird.jobworker import jwrpcapi
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class JDManager(manager.Manager):
"""Manages the running job from creation to destruction."""
target = messaging.Target(version='1.0')
def __init__(self, *args, **kwargs):
LOG.debug(_('JDManager initialization...'))
super(JDManager, self).__init__(service_name="job_daemon",
*args, **kwargs)
self.jw_api = jwrpcapi.JobWorkerAPI()
def init_host(self):
LOG.debug(_('JDManager init_host...'))
pass
def cleanup_host(self):
LOG.debug(_('JDManager cleanup_host...'))
pass
def pre_start_hook(self):
LOG.debug(_('JDManager pre_start_hook...'))
pass
def post_start_hook(self):
LOG.debug(_('JDManager post_start_hook...'))
pass
# rpc message endpoint handling
def say_hello_world_call(self, ctx, payload):
LOG.info(_LI("jobdaemon say hello world, call payload: %s"), payload)
info_text = "payload: %s" % payload
report = self.jw_api.say_hello_world_call(ctx,
'jobdaemon forward'
+ info_text)
report['jobdaemon'] = info_text
return report
def say_hello_world_cast(self, ctx, payload):
LOG.info(_LI("jobdaemon say hello world, cast payload: %s"), payload)
self.jw_api.say_hello_world_cast(ctx, 'hello from jobdaemon')
# no return value to browser indeed for cast. check the log info
info_text = "payload: %s" % payload
return {'jobdaemon': info_text}

View File

@ -1,80 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Client side of the job daemon RPC API.
"""
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from kingbird.common import rpc
from kingbird.common.serializer import KingbirdSerializer as Serializer
from kingbird.common import topics
CONF = cfg.CONF
rpcapi_cap_opt = cfg.StrOpt('jobdaemon',
help='Set a version cap for messages sent to'
'jobdaemon services. If you plan to do a'
'live upgrade from an old version to a'
'newer version, you should set this option'
'to the old version before beginning the'
'live upgrade procedure. Only upgrading'
'to the next version is supported, so you'
'cannot skip a release for the live upgrade'
'procedure.')
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
LOG = logging.getLogger(__name__)
class JobDaemonAPI(object):
"""Client side of the job daemon rpc API.
API version history:
* 1.0 - Initial version.
"""
VERSION_ALIASES = {
'mitaka': '1.0',
}
def __init__(self):
super(JobDaemonAPI, self).__init__()
target = messaging.Target(topic=topics.TOPIC_JOBDAEMON, version='1.0')
upgrade_level = CONF.upgrade_levels.jobdaemon
version_cap = 1.0
if upgrade_level == 'auto':
version_cap = self._determine_version_cap(target)
else:
version_cap = self.VERSION_ALIASES.get(upgrade_level,
upgrade_level)
serializer = Serializer()
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
# to do the version compatibility for future purpose
def _determine_version_cap(self, target):
version_cap = 1.0
return version_cap
def say_hello_world_call(self, ctxt, payload):
return self.client.call(ctxt, 'say_hello_world_call', payload=payload)
def say_hello_world_cast(self, ctxt, payload):
return self.client.cast(ctxt, 'say_hello_world_cast', payload=payload)

View File

@ -1,79 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service as srv
from kingbird.common.i18n import _
from kingbird.common.serializer import KingbirdSerializer as Serializer
from kingbird.common.service import Service
from kingbird.common import topics
from kingbird.jobdaemon.jdmanager import JDManager
_TIMER_INTERVAL = 30
_TIMER_INTERVAL_MAX = 60
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class JDService(Service):
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, serializer=None,
*args, **kwargs):
super(JDService, self).__init__(host, binary, topic, manager,
report_interval, periodic_enable,
periodic_fuzzy_delay,
periodic_interval_max, serializer,
*args, **kwargs)
def create_service():
LOG.debug(_('create job daemon server'))
jdmanager = JDManager()
jdservice = JDService(
host=CONF.host,
binary="job_daemon",
topic=topics.TOPIC_JOBDAEMON,
manager=jdmanager,
periodic_enable=True,
report_interval=_TIMER_INTERVAL,
periodic_interval_max=_TIMER_INTERVAL_MAX,
serializer=Serializer()
)
jdservice.start()
return jdservice
_launcher = None
def serve(jdservice, workers=1):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = srv.launch(CONF, jdservice, workers=workers)
def wait():
_launcher.wait()

View File

@ -1,38 +0,0 @@
===============================
jobworker
===============================
Kingbird Job Worker has responsibility for:
Concurrently process the divided smaller jobs from Kingbird Job Daemon.
Each smaller job will be a job to a specific OpenStack instance, i.e.,
one OpenStack region.
Periodically running background job which was assigned by the Kingbird
Job Daemon, Kingbird Job Worker will generate a new one-time job (for
example, for quota enforcement, generate a collecting resource usage job),
and send it to the Kingbird Job Daemon for further processing in each
cycle. Multiple Job Worker could run in parallel, and also can work in
multi-worker mode. But for one smaller job from Kingbird Job Daemon,
only one Kingbird Job Worker will be the owner. One Kingbird Job Worker
could be the owner of multiple smaller jobs from multiple Kingbird
JobDaemons.
Multiple Kingbird Job Workers will be designed and run in stateless mode,
persistent data will be accessed (read and write) from the Kingbird
Database through the DAL module.
jwrpcapi.py:
the client side RPC api for JobWoker. Often the JobDaemon service will
call the api provided in this file, and the RPC client will send the
request to message-bus, and then the JobWorker can pickup the RPC message
from the message bus
jwservice.py:
run JobWorker in multi-worker mode, and establish RPC server
jwmanager.py:
all rpc messages received by the jwservice RPC server will be processed
in the jwmanager's regarding function.
jwcfg.py:
configuration and initialization for JobWorker

View File

@ -1,81 +0,0 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Client side of the job worker RPC API.
"""
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from kingbird.common import rpc
from kingbird.common.serializer import KingbirdSerializer as Serializer
from kingbird.common import topics
CONF = cfg.CONF
rpcapi_cap_opt = cfg.StrOpt('jobworker',
help='Set a version cap for messages sent to'
'jobworker services. If you plan to do a'
'live upgrade from an old version to a'
'newer version, you should set this option'
'to the old version before beginning the'
'live upgrade procedure. Only upgrading'
'to the next version is supported, so you'
'cannot skip a release for the live upgrade'
'procedure.')
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
LOG = logging.getLogger(__name__)
class JobWorkerAPI(object):
"""Client side of the job worker rpc API.
API version history:
* 1.0 - Initial version.
"""
VERSION_ALIASES = {
'mitaka': '1.0',
}
def __init__(self):
super(JobWorkerAPI, self).__init__()
target = messaging.Target(topic=topics.TOPIC_JOBWORKER, version='1.0')
upgrade_level = CONF.upgrade_levels.jobworker
version_cap = 1.0
if upgrade_level == 'auto':
version_cap = self._determine_version_cap(target)
else:
version_cap = self.VERSION_ALIASES.get(upgrade_level,
upgrade_level)
serializer = Serializer()
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
# to do the version compatibility for future purpose
def _determine_version_cap(self, target):
version_cap = 1.0
return version_cap
def say_hello_world_call(self, ctxt, payload):
return self.client.call(ctxt, 'say_hello_world_call', payload=payload)
def say_hello_world_cast(self, ctxt, payload):
return self.client.cast(ctxt, 'say_hello_world_cast', payload=payload)

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from mock import patch
@ -26,9 +28,8 @@ from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from kingbird.api import apicfg
from kingbird.api.controllers import helloworld
from kingbird.api.controllers import quota_manager
from kingbird.common import rpc
from kingbird.jobdaemon import jdrpcapi
from kingbird.tests import base
@ -36,16 +37,6 @@ OPT_GROUP_NAME = 'keystone_authtoken'
cfg.CONF.import_group(OPT_GROUP_NAME, "keystonemiddleware.auth_token")
def fake_say_hello_world_call(self, ctxt, payload):
info_text = "say_hello_world_call, payload: %s" % payload
return {'jobdaemon': info_text}
def fake_say_hello_world_cast(self, ctxt, payload):
info_text = "say_hello_world_cast, payload: %s" % payload
return {'jobdaemon': info_text}
def fake_delete_response(self, context):
resp = jsonutils.dumps(context.to_dict())
return resp
@ -121,10 +112,6 @@ class TestRootController(KBFunctionalTest):
class TestV1Controller(KBFunctionalTest):
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_get(self):
response = self.app.get('/v1.0')
self.assertEqual(response.status_int, 200)
@ -134,9 +121,9 @@ class TestV1Controller(KBFunctionalTest):
links = json_body.get('links')
v1_link = links[0]
helloworld_link = links[1]
quota_manager_link = links[1]
self.assertEqual('self', v1_link['rel'])
self.assertEqual('helloworld', helloworld_link['rel'])
self.assertEqual('quota', quota_manager_link['rel'])
def _test_method_returns_405(self, method):
api_method = getattr(self.app, method)
@ -144,106 +131,64 @@ class TestV1Controller(KBFunctionalTest):
self.assertEqual(response.status_int, 405)
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_post(self):
self._test_method_returns_405('post')
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_put(self):
self._test_method_returns_405('put')
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_patch(self):
self._test_method_returns_405('patch')
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_delete(self):
self._test_method_returns_405('delete')
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_head(self):
self._test_method_returns_405('head')
class TestHelloworld(KBFunctionalTest):
class TestQuotaManager(KBFunctionalTest):
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_get(self):
response = self.app.get('/v1.0/helloworld')
response = self.app.get('/v1.0/quota/?arg=tenant_1')
self.assertEqual(response.status_int, 200)
self.assertIn('hello world message for', response)
self.assertIn('tenant_1', json.loads(response.text).values())
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_post(self):
response = self.app.post_json('/v1.0/helloworld',
@mock.patch.object(rpc, 'get_client')
def test_post(self, mock_client):
mock_client().call.return_value = "Post method called"
response = self.app.post_json('/v1.0/quota',
headers={'X-Tenant-Id': 'tenid'})
self.assertEqual(response.status_int, 200)
self.assertIn('## post call ##', response)
self.assertIn('jobdaemon', response)
self.assertIn("Post method called", str(response.text))
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_put(self):
response = self.app.put_json('/v1.0/helloworld',
@mock.patch.object(rpc, 'get_client')
def test_put(self, mock_client):
mock_client().call.return_value = "Put method called"
response = self.app.put_json('/v1.0/quota',
headers={'X-Tenant-Id': 'tenid'})
self.assertEqual(response.status_int, 200)
self.assertIn('## put call ##', response)
self.assertIn('jobdaemon', response)
self.assertIn("Put method called", str(response.text))
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_delete(self):
response = self.app.delete('/v1.0/helloworld',
@mock.patch.object(rpc, 'get_client')
def test_delete(self, mock_client):
response = self.app.delete('/v1.0/quota',
headers={'X-Tenant-Id': 'tenid'})
self.assertEqual(response.status_int, 200)
self.assertIn('cast example', response)
self.assertIn('check the log produced by jobdaemon', response)
self.assertIn("cast example", json.loads(response.text))
class TestHelloworldContext(KBFunctionalTest):
class TestQuotaManagerContext(KBFunctionalTest):
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
@patch.object(helloworld.HelloWorldController, '_delete_response',
@patch.object(quota_manager.QuotaManagerController, '_delete_response',
new=fake_delete_response)
def test_context_set_in_request(self):
response = self.app.delete('/v1.0/helloworld',
response = self.app.delete('/v1.0/quota',
headers={'X_Auth_Token': 'a-token',
'X_TENANT_ID': 't-id',
'X_USER_ID': 'u-id',
@ -271,12 +216,8 @@ class TestErrors(KBFunctionalTest):
self.assertEqual(response.status_int, 404)
@patch.object(rpc, 'get_client', new=mock.Mock())
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_call',
new=fake_say_hello_world_call)
@patch.object(jdrpcapi.JobDaemonAPI, 'say_hello_world_cast',
new=fake_say_hello_world_cast)
def test_bad_method(self):
response = self.app.patch('/v1.0/helloworld/123.json',
response = self.app.patch('/v1.0/quota/123.json',
expect_errors=True)
self.assertEqual(response.status_int, 405)

0
kingbird/tests/unit/__init__.py Executable file → Normal file
View File

View File

@ -0,0 +1,43 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kingbird.engine.listener import EngineManager
from kingbird.engine.quota_manager import QuotaManager
from kingbird.tests import base
from kingbird.tests import utils
class TestEngineManager(base.KingbirdTestCase):
def setUp(self):
super(TestEngineManager, self).setUp()
self.context = utils.dummy_context()
def test_init(self):
engine_manager = EngineManager()
self.assertIsNotNone(engine_manager)
self.assertIsInstance(engine_manager.qm, QuotaManager)
def test_say_hello_world_call(self):
payload = "test payload"
engine_manager = EngineManager()
return_value = engine_manager.say_hello_world_call(self.context,
payload)
expected_output = "payload: %s" % payload
self.assertEqual(return_value, expected_output)
def test_say_hello_world_cast(self):
payload = "test payload"
engine_manager = EngineManager()
return_value = engine_manager.say_hello_world_call(self.context,
payload)
expected_output = "payload: %s" % payload
self.assertEqual(return_value, expected_output)

View File

@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kingbird.engine.quota_manager import QuotaManager
from kingbird.tests import base
class TestQuotaManager(base.KingbirdTestCase):
def setUp(self):
super(TestQuotaManager, self).setUp()
def test_init(self):
qm = QuotaManager()
self.assertIsNotNone(qm)
def test_periodic_balance_all(self):
pass

View File

@ -0,0 +1,53 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kingbird.engine.listener import EngineManager
from kingbird.engine import service
from kingbird.tests import base
from oslo_config import cfg
CONF = cfg.CONF
class TestEngineService(base.KingbirdTestCase):
def setUp(self):
super(TestEngineService, self).setUp()
def test_init(self):
manager = EngineManager()
engine_service = service.EngineService('127.0.0.1', 'engine',
'topic-A', manager)
self.assertIsNotNone(engine_service)
@mock.patch.object(service, 'EngineService')
def test_create_service(mock_engine):
service.create_service()
mock_engine().start.assert_called_once_with()
@mock.patch.object(service, 'EngineService')
@mock.patch.object(service, 'srv')
def test_serve(mock_srv, mock_engine):
manager = EngineManager()
engine_service = service.EngineService('127.0.0.1', 'engine',
'topic-A', manager)
service.serve(engine_service, 2)
mock_srv.launch.assert_called_once_with(CONF, engine_service, workers=2)
@mock.patch.object(service, '_launcher')
def test_wait(mock_launcher):
service.wait()
mock_launcher.wait.assert_called_once_with()