Initial patch for Trircle stateless architecture(XJob)

Initial patch to implement the framework of Tricircle, this is
the part for XJob, establish rpc connection to message bus, and one
sample rpc call 'testrpc' from Nova-APIGW.

Blueprint: https://blueprints.launchpad.net/tricircle/+spec/implement-stateless

Change-Id: I8e538de187c075ad06e090edb7061ae6caa1deda
Signed-off-by: Chaoyi Huang <joehuang@huawei.com>
This commit is contained in:
Chaoyi Huang 2015-12-11 13:02:44 +08:00
parent 1aad4ae585
commit 3b8e791e18
24 changed files with 954 additions and 3 deletions

View File

@ -16,6 +16,11 @@
# Much of this module is based on the work of the Ironic team
# see http://git.openstack.org/cgit/openstack/ironic/tree/ironic/cmd/api.py
import eventlet
if __name__ == "__main__":
eventlet.monkey_patch()
import logging as std_logging
import sys

61
cmd/xjob.py Normal file
View File

@ -0,0 +1,61 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Much of this module is based on the work of the Ironic team
# see http://git.openstack.org/cgit/openstack/ironic/tree/ironic/cmd/api.py
import eventlet
if __name__ == "__main__":
eventlet.monkey_patch()
import logging as std_logging
import sys
from oslo_config import cfg
from oslo_log import log as logging
from tricircle.common import config
from tricircle.common.i18n import _LI
from tricircle.common.i18n import _LW
from tricircle.xjob import xservice
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def main():
config.init(xservice.common_opts, sys.argv[1:])
host = CONF.host
workers = CONF.workers
if workers < 1:
LOG.warning(_LW("Wrong worker number, worker = %(workers)s"), workers)
workers = 1
LOG.info(_LI("XJob Server on http://%(host)s with %(workers)s"),
{'host': host, 'workers': workers})
xservice.serve(xservice.create_service(), workers)
LOG.info(_LI("Configuration:"))
CONF.log_opt_values(LOG, std_logging.INFO)
xservice.wait()
if __name__ == '__main__':
main()

View File

@ -35,6 +35,7 @@ enable_plugin tricircle https://github.com/openstack/tricircle/ experiment
enable_service t-api
enable_service t-ngw
enable_service t-cgw
enable_service t-job
# Use Neutron instead of nova-network
disable_service n-net

View File

@ -179,6 +179,23 @@ function configure_tricircle_cinder_apigw {
fi
}
function configure_tricircle_xjob {
if is_service_enabled t-job ; then
echo "Configuring Tricircle xjob"
touch $TRICIRCLE_XJOB_CONF
iniset $TRICIRCLE_XJOB_CONF DEFAULT debug $ENABLE_DEBUG_LOG_LEVEL
iniset $TRICIRCLE_XJOB_CONF DEFAULT verbose True
iniset $TRICIRCLE_XJOB_CONF DEFAULT use_syslog $SYSLOG
iniset $TRICIRCLE_XJOB_CONF database connection `database_connection_url tricircle`
iniset $TRICIRCLE_XJOB_CONF oslo_concurrency lock_path $TRICIRCLE_STATE_PATH/lock
setup_colorized_logging $TRICIRCLE_XJOB_CONF DEFAULT
fi
}
if [[ "$Q_ENABLE_TRICIRCLE" == "True" ]]; then
if [[ "$1" == "stack" && "$2" == "pre-install" ]]; then
@ -193,6 +210,7 @@ if [[ "$Q_ENABLE_TRICIRCLE" == "True" ]]; then
configure_tricircle_api
configure_tricircle_nova_apigw
configure_tricircle_cinder_apigw
configure_tricircle_xjob
echo export PYTHONPATH=\$PYTHONPATH:$TRICIRCLE_DIR >> $RC_DIR/.localrc.auto
@ -222,6 +240,11 @@ if [[ "$Q_ENABLE_TRICIRCLE" == "True" ]]; then
run_process t-cgw "python $TRICIRCLE_CINDER_APIGW --config-file $TRICIRCLE_CINDER_APIGW_CONF"
fi
if is_service_enabled t-job; then
run_process t-job "python $TRICIRCLE_XJOB --config-file $TRICIRCLE_XJOB_CONF"
fi
fi
if [[ "$1" == "unstack" ]]; then
@ -237,5 +260,9 @@ if [[ "$Q_ENABLE_TRICIRCLE" == "True" ]]; then
if is_service_enabled t-cgw; then
stop_process t-cgw
fi
if is_service_enabled t-job; then
stop_process t-job
fi
fi
fi

View File

@ -35,6 +35,10 @@ TRICIRCLE_CINDER_APIGW_HOST=${TRICIRCLE_CINDER_APIGW_HOST:-$SERVICE_HOST}
TRICIRCLE_CINDER_APIGW_PORT=${TRICIRCLE_CINDER_APIGW_PORT:-19997}
TRICIRCLE_CINDER_APIGW_PROTOCOL=${TRICIRCLE_CINDER_APIGW_PROTOCOL:-$SERVICE_PROTOCOL}
# tricircle xjob
TRICIRCLE_XJOB=$TRICIRCLE_DIR/cmd/xjob.py
TRICIRCLE_XJOB_CONF=$TRICIRCLE_CONF_DIR/xjob.conf
TRICIRCLE_AUTH_CACHE_DIR=${TRICIRCLE_AUTH_CACHE_DIR:-/var/cache/tricircle}
export PYTHONPATH=$PYTHONPATH:$TRICIRCLE_DIR

View File

@ -2,7 +2,7 @@
output_file = etc/api.conf.sample
wrap_width = 79
namespace = tricircle.api
namespace = tricircle.client
namespace = tricircle.common_opts
namespace = oslo.log
namespace = oslo.messaging
namespace = oslo.policy

View File

@ -2,6 +2,7 @@
output_file = etc/cinder_apigw.conf.sample
wrap_width = 79
namespace = tricircle.cinder_apigw
namespace = tricircle.common_opts
namespace = oslo.log
namespace = oslo.messaging
namespace = oslo.policy

View File

@ -2,6 +2,7 @@
output_file = etc/nova_apigw.conf.sample
wrap_width = 79
namespace = tricircle.nova_apigw
namespace = tricircle.common_opts
namespace = oslo.log
namespace = oslo.messaging
namespace = oslo.policy

15
etc/xjob-cfg-gen.conf Normal file
View File

@ -0,0 +1,15 @@
[DEFAULT]
output_file = etc/xjob.conf.sample
wrap_width = 79
namespace = tricircle.xjob
namespace = tricircle.common_opts
namespace = oslo.log
namespace = oslo.messaging
namespace = oslo.policy
namespace = oslo.service.periodic_task
namespace = oslo.service.service
namespace = oslo.service.sslutils
namespace = oslo.db
namespace = oslo.middleware
namespace = oslo.concurrency
namespace = keystonemiddleware.auth_token

View File

@ -47,9 +47,13 @@ output_file = tricircle/locale/tricircle.pot
[entry_points]
oslo.config.opts =
tricircle.common_opts = tricircle.common.opts:list_opts
tricircle.api = tricircle.api.opts:list_opts
tricircle.client = tricircle.common.opts:list_opts
tricircle.nova_apigw = tricircle.nova_apigw.opts:list_opts
tricircle.cinder_apigw = tricircle.cinder_apigw.opts:list_opts
tricircle.xjob = tricircle.xjob.opts:list_opts

View File

@ -23,7 +23,10 @@ commands = {posargs}
commands = python setup.py testr --coverage --testr-args='{posargs}'
[testenv:genconfig]
commands = oslo-config-generator --config-file=etc/config-generator.conf
commands = oslo-config-generator --config-file=etc/api-cfg-gen.conf
oslo-config-generator --config-file=etc/nova_apigw-cfg-gen.conf
oslo-config-generator --config-file=etc/cinder_apigw-cfg-gen.conf
oslo-config-generator --config-file=etc/xjob-cfg-gen.conf
[testenv:docs]
commands = python setup.py build_sphinx

75
tricircle/common/baserpc.py Executable file
View File

@ -0,0 +1,75 @@
#
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# copy and modify from OpenStack Nova
"""
Base RPC client and server common to all services.
"""
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from tricircle.common import rpc
CONF = cfg.CONF
rpcapi_cap_opt = cfg.StrOpt('baseclientapi',
help='Set a version cap for messages sent to the'
'base api in any service')
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
_NAMESPACE = 'baseclientapi'
class BaseClientAPI(object):
"""Client side of the base rpc API.
API version history:
1.0 - Initial version.
"""
VERSION_ALIASES = {
# baseapi was added in the first version of Tricircle
}
def __init__(self, topic):
super(BaseClientAPI, self).__init__()
target = messaging.Target(topic=topic,
namespace=_NAMESPACE,
version='1.0')
version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.baseapi,
CONF.upgrade_levels.baseapi)
self.client = rpc.get_client(target, version_cap=version_cap)
def ping(self, context, arg, timeout=None):
arg_p = jsonutils.to_primitive(arg)
cctxt = self.client.prepare(timeout=timeout)
return cctxt.call(context, 'ping', arg=arg_p)
class BaseServerRPCAPI(object):
"""Server side of the base RPC API."""
target = messaging.Target(namespace=_NAMESPACE, version='1.0')
def __init__(self, service_name):
self.service_name = service_name
def ping(self, context, arg):
resp = {'service': self.service_name, 'arg': arg}
return jsonutils.to_primitive(resp)

View File

@ -25,6 +25,7 @@ import oslo_log.log as logging
from tricircle.common.i18n import _LI
# from tricircle import policy
from tricircle.common import rpc
from tricircle.common import version
@ -45,6 +46,8 @@ def init(opts, args, **kwargs):
_setup_logging()
rpc.init(cfg.CONF)
def _setup_logging():
"""Sets up the logging options for a log with supplied name."""

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from pecan import request
import oslo_context.context as oslo_ctx
from tricircle.db import core
@ -28,6 +30,27 @@ def get_admin_context():
return ctx
def extract_context_from_environ():
context_paras = {'auth_token': 'HTTP_X_AUTH_TOKEN',
'user': 'HTTP_X_USER_ID',
'tenant': 'HTTP_X_TENANT_ID',
'user_name': 'HTTP_X_USER_NAME',
'tenant_name': 'HTTP_X_PROJECT_NAME',
'domain': 'HTTP_X_DOMAIN_ID',
'user_domain': 'HTTP_X_USER_DOMAIN_ID',
'project_domain': 'HTTP_X_PROJECT_DOMAIN_ID',
'request_id': 'openstack.request_id'}
environ = request.environ
for key in context_paras:
context_paras[key] = environ.get(context_paras[key])
role = environ.get('HTTP_X_ROLE')
context_paras['is_admin'] = role == 'admin'
return Context(**context_paras)
class ContextBase(oslo_ctx.RequestContext):
def __init__(self, auth_token=None, user_id=None, tenant_id=None,
is_admin=False, request_id=None, overwrite=True,

View File

@ -15,8 +15,12 @@
import tricircle.common.client
# Todo: adding rpc cap negotiation configuration after first release
# import tricircle.common.xrpcapi
def list_opts():
return [
('client', tricircle.common.client.client_opts),
# ('upgrade_levels', tricircle.common.xrpcapi.rpcapi_cap_opt),
]

135
tricircle/common/rpc.py Executable file
View File

@ -0,0 +1,135 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# copy and modify from Nova
__all__ = [
'init',
'cleanup',
'set_defaults',
'add_extra_exmods',
'clear_extra_exmods',
'get_allowed_exmods',
'RequestContextSerializer',
'get_client',
'get_server',
'get_notifier',
]
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_serialization import jsonutils
import tricircle.common.context
import tricircle.common.exceptions
CONF = cfg.CONF
TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
tricircle.common.exceptions.__name__,
]
EXTRA_EXMODS = []
def init(conf):
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods)
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
global TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def set_defaults(control_exchange):
messaging.set_transport_defaults(control_exchange)
def add_extra_exmods(*args):
EXTRA_EXMODS.extend(args)
def clear_extra_exmods():
del EXTRA_EXMODS[:]
def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
return context.to_dict()
def deserialize_context(self, context):
return tricircle.common.context.Context.from_dict(context)
def get_transport_url(url_str=None):
return messaging.TransportURL.parse(CONF, url_str)
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
def get_notifier(service, host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)

83
tricircle/common/serializer.py Executable file
View File

@ -0,0 +1,83 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from oslo_messaging import Serializer
ATTR_NOT_SPECIFIED = object()
class Mapping(object):
def __init__(self, mapping):
self.direct_mapping = mapping
self.reverse_mapping = {}
for key, value in six.iteritems(mapping):
self.reverse_mapping[value] = key
_SINGLETON_MAPPING = Mapping({
ATTR_NOT_SPECIFIED: "@@**ATTR_NOT_SPECIFIED**@@",
})
class TricircleSerializer(Serializer):
def __init__(self, base=None):
super(TricircleSerializer, self).__init__()
self._base = base
def serialize_entity(self, context, entity):
if isinstance(entity, dict):
for key, value in six.iteritems(entity):
entity[key] = self.serialize_entity(context, value)
elif isinstance(entity, list):
for i, item in enumerate(entity):
entity[i] = self.serialize_entity(context, item)
elif entity in _SINGLETON_MAPPING.direct_mapping:
entity = _SINGLETON_MAPPING.direct_mapping[entity]
if self._base is not None:
entity = self._base.serialize_entity(context, entity)
return entity
def deserialize_entity(self, context, entity):
if isinstance(entity, dict):
for key, value in six.iteritems(entity):
entity[key] = self.deserialize_entity(context, value)
elif isinstance(entity, list):
for i, item in enumerate(entity):
entity[i] = self.deserialize_entity(context, item)
elif entity in _SINGLETON_MAPPING.reverse_mapping:
entity = _SINGLETON_MAPPING.reverse_mapping[entity]
if self._base is not None:
entity = self._base.deserialize_entity(context, entity)
return entity
def serialize_context(self, context):
if self._base is not None:
context = self._base.serialize_context(context)
return context
def deserialize_context(self, context):
if self._base is not None:
context = self._base.deserialize_context(context)
return context

20
tricircle/common/topics.py Executable file
View File

@ -0,0 +1,20 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
CREATE = 'create'
DELETE = 'delete'
UPDATE = 'update'
TOPIC_XJOB = 'xjob'

74
tricircle/common/xrpcapi.py Executable file
View File

@ -0,0 +1,74 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Client side of the job daemon RPC API.
"""
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
import rpc
from serializer import TricircleSerializer as Serializer
import topics
CONF = cfg.CONF
rpcapi_cap_opt = cfg.StrOpt('xjobapi',
default='1.0',
help='Set a version cap for messages sent to the'
'xjob api in any service')
CONF.register_opt(rpcapi_cap_opt, 'upgrade_levels')
LOG = logging.getLogger(__name__)
class XJobAPI(object):
"""Client side of the xjob rpc API.
API version history:
* 1.0 - Initial version.
"""
VERSION_ALIASES = {
'mitaka': '1.0',
}
def __init__(self):
super(XJobAPI, self).__init__()
rpc.init(CONF)
target = messaging.Target(topic=topics.TOPIC_XJOB, version='1.0')
upgrade_level = CONF.upgrade_levels.xjobapi
version_cap = 1.0
if upgrade_level == 'auto':
version_cap = self._determine_version_cap(target)
else:
version_cap = self.VERSION_ALIASES.get(upgrade_level,
upgrade_level)
serializer = Serializer()
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
# to do the version compatibility for future purpose
def _determine_version_cap(self, target):
version_cap = 1.0
return version_cap
def test_rpc(self, ctxt, payload):
return self.client.call(ctxt, 'test_rpc', payload=payload)

View File

@ -15,8 +15,14 @@
import pecan
from pecan import expose
from pecan import rest
import oslo_log.log as logging
from tricircle.common import context as ctx
from tricircle.common import xrpcapi
LOG = logging.getLogger(__name__)
@ -64,6 +70,7 @@ class V21Controller(object):
def __init__(self):
self.sub_controllers = {
"testrpc": TestRPCController()
}
for name, ctrl in self.sub_controllers.items():
@ -105,3 +112,20 @@ class V21Controller(object):
@index.when(method='PATCH')
def not_supported(self):
pecan.abort(405)
class TestRPCController(rest.RestController):
def __init__(self, *args, **kwargs):
super(TestRPCController, self).__init__(*args, **kwargs)
self.xjobapi = xrpcapi.XJobAPI()
@expose(generic=True, template='json')
def index(self):
if pecan.request.method != 'GET':
pecan.abort(405)
context = ctx.extract_context_from_environ()
payload = '#result from xjob rpc'
return self.xjobapi.test_rpc(context, payload)

0
tricircle/xjob/__init__.py Executable file
View File

23
tricircle/xjob/opts.py Normal file
View File

@ -0,0 +1,23 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tricircle.xjob.xservice
def list_opts():
return [
('DEFAULT', tricircle.xjob.xservice.common_opts),
('DEFAULT', tricircle.xjob.xservice.service_opts),
]

116
tricircle/xjob/xmanager.py Executable file
View File

@ -0,0 +1,116 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import periodic_task
from tricircle.common.i18n import _
from tricircle.common.i18n import _LI
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class PeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self):
super(PeriodicTasks, self).__init__(CONF)
class XManager(PeriodicTasks):
target = messaging.Target(version='1.0')
def __init__(self, host=None, service_name='xjob'):
LOG.debug(_('XManager initialization...'))
if not host:
host = CONF.host
self.host = host
self.service_name = service_name
# self.notifier = rpc.get_notifier(self.service_name, self.host)
self.additional_endpoints = []
super(XManager, self).__init__()
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""init_host
Hook to do additional manager initialization when one requests
the service be started. This is called before any service record
is created.
Child classes should override this method.
"""
LOG.debug(_('XManager init_host...'))
pass
def cleanup_host(self):
"""cleanup_host
Hook to do cleanup work when the service shuts down.
Child classes should override this method.
"""
LOG.debug(_('XManager cleanup_host...'))
pass
def pre_start_hook(self):
"""pre_start_hook
Hook to provide the manager the ability to do additional
start-up work before any RPC queues/consumers are created. This is
called after other initialization has succeeded and a service
record is created.
Child classes should override this method.
"""
LOG.debug(_('XManager pre_start_hook...'))
pass
def post_start_hook(self):
"""post_start_hook
Hook to provide the manager the ability to do additional
start-up work immediately after a service creates RPC consumers
and starts 'running'.
Child classes should override this method.
"""
LOG.debug(_('XManager post_start_hook...'))
pass
# rpc message endpoint handling
def test_rpc(self, ctx, payload):
LOG.info(_LI("xmanager receive payload: %s"), payload)
info_text = "xmanager receive payload: %s" % payload
return info_text

249
tricircle/xjob/xservice.py Executable file
View File

@ -0,0 +1,249 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import sys
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service as srv
from tricircle.common.i18n import _
from tricircle.common.i18n import _LE
from tricircle.common.i18n import _LI
from tricircle.common import baserpc
from tricircle.common import context
from tricircle.common import rpc
from tricircle.common import version
from tricircle.common.serializer import TricircleSerializer as Serializer
from tricircle.common import topics
from tricircle.xjob.xmanager import XManager
_TIMER_INTERVAL = 30
_TIMER_INTERVAL_MAX = 60
common_opts = [
cfg.StrOpt('host', default='tricircle.xhost',
help=_("The host name for RPC server")),
cfg.IntOpt('workers', default=1,
help=_("number of workers")),
]
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='Seconds between nodes reporting state to datastore'),
cfg.BoolOpt('periodic_enable',
default=True,
help='Enable periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='Range of seconds to randomly delay when starting the'
' periodic task scheduler to reduce stampeding.'
' (Disable by setting to 0)'),
]
CONF = cfg.CONF
CONF.register_opts(service_opts)
LOG = logging.getLogger(__name__)
class XService(srv.Service):
"""class Service
Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager and reports
its state to the database services table.
"""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, serializer=None,
*args, **kwargs):
super(XService, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
self.manager = manager
self.rpc_server = None
self.report_interval = report_interval
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.interval_max = periodic_interval_max
self.serializer = serializer
self.saved_args, self.saved_kwargs = args, kwargs
def start(self):
ver_str = version.version_info
LOG.info(_LI('Starting %(topic)s node (version %(version)s)'),
{'topic': self.topic, 'version': ver_str})
self.basic_config_check()
self.manager.init_host()
self.manager.pre_start_hook()
LOG.debug(_("Creating RPC server for service %s"), self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [
self.manager,
baserpc.BaseServerRPCAPI(self.manager.service_name)
]
endpoints.extend(self.manager.additional_endpoints)
self.rpc_server = rpc.get_server(target, endpoints, self.serializer)
self.rpc_server.start()
self.manager.post_start_hook()
if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
self.tg.add_dynamic_timer(self.periodic_tasks,
initial_delay=initial_delay,
periodic_interval_max=self.interval_max)
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None,
serializer=None,):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_enable: defaults to CONF.periodic_enable
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param periodic_interval_max: if set, the max time to wait between runs
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(sys.argv[0])
if not topic:
topic = binary.rpartition('tricircle-')[2]
if not manager:
manager_cls = ('%s_manager' %
binary.rpartition('tricircle-')[2])
manager = CONF.get(manager_cls, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_enable is None:
periodic_enable = CONF.periodic_enable
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max,
serializer=serializer)
return service_obj
def kill(self):
self.stop()
def stop(self):
try:
self.rpc_server.stop()
except Exception:
pass
try:
self.manager.cleanup_host()
except Exception:
LOG.exception(_LE('Service error occurred during cleanup_host'))
pass
super(XService, self).stop()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def basic_config_check(self):
"""Perform basic config checks before starting processing."""
# Make sure the tempdir exists and is writable
# try:
# with utils.tempdir():
# pass
# except Exception as e:
# LOG.error(_LE('Temporary directory is invalid: %s'), e)
# sys.exit(1)
def create_service():
LOG.debug(_('create xjob server'))
xmanager = XManager()
xservice = XService(
host=CONF.host,
binary="xjob",
topic=topics.TOPIC_XJOB,
manager=xmanager,
periodic_enable=True,
report_interval=_TIMER_INTERVAL,
periodic_interval_max=_TIMER_INTERVAL_MAX,
serializer=Serializer()
)
xservice.start()
return xservice
_launcher = None
def serve(xservice, workers=1):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = srv.launch(CONF, xservice, workers=workers)
def wait():
_launcher.wait()