tricircle/tricircle/xjob/xservice.py

254 lines
8.2 KiB
Python

# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import sys
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service as srv
from tricircle.common.i18n import _
from tricircle.common import baserpc
from tricircle.common import context
from tricircle.common import rpc
from tricircle.common import version
from tricircle.common import serializer as t_serializer
from tricircle.common import topics
from tricircle.xjob import xmanager as t_xmanager
_TIMER_INTERVAL = 30
_TIMER_INTERVAL_MAX = 60
common_opts = [
cfg.StrOpt('host', default='tricircle.xhost',
help=_("The host name for RPC server")),
cfg.IntOpt('workers', default=1,
help=_("Number of workers")),
cfg.IntOpt('worker_handle_timeout', default=1800,
help=_("Timeout for worker's one turn of processing, in"
" seconds")),
cfg.IntOpt('job_run_expire', default=180,
help=_("Running job is considered expires after this time, in"
" seconds")),
cfg.FloatOpt('worker_sleep_time', default=0.1,
help=_("Seconds a worker sleeps after one run in a loop")),
cfg.IntOpt('redo_time_span', default=172800,
help=_("Time span in seconds, we calculate the latest job "
"timestamp by subtracting this time span from the "
"current timestamp, jobs created between these two "
"timestamps will be redone")),
cfg.BoolOpt('enable_api_gateway',
default=False,
help=_('Whether the Nova API gateway is enabled'))
]
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='Seconds between nodes reporting state to datastore'),
cfg.BoolOpt('periodic_enable',
default=True,
help='Enable periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='Range of seconds to randomly delay when starting the'
' periodic task scheduler to reduce stampeding.'
' (Disable by setting to 0)'),
]
CONF = cfg.CONF
CONF.register_opts(service_opts)
LOG = logging.getLogger(__name__)
class XService(srv.Service):
"""class Service
Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager and reports
its state to the database services table.
"""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, serializer=None,
*args, **kwargs):
super(XService, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
self.manager = manager
self.rpc_server = None
self.report_interval = report_interval
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.interval_max = periodic_interval_max
self.serializer = serializer
self.saved_args, self.saved_kwargs = args, kwargs
def start(self):
ver_str = version.version_info
LOG.info('Starting %(topic)s node (version %(version)s)',
{'topic': self.topic, 'version': ver_str})
self.manager.init_host()
self.manager.pre_start_hook()
LOG.debug(_("Creating RPC server for service %s"), self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [
self.manager,
baserpc.BaseServerRPCAPI(self.manager.service_name)
]
endpoints.extend(self.manager.additional_endpoints)
self.rpc_server = rpc.get_server(target, endpoints, self.serializer)
self.rpc_server.start()
self.manager.post_start_hook()
if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
self.tg.add_dynamic_timer(self.periodic_tasks,
initial_delay=initial_delay,
periodic_interval_max=self.interval_max)
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None,
serializer=None,):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_enable: defaults to CONF.periodic_enable
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param periodic_interval_max: if set, the max time to wait between runs
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(sys.argv[0])
if not topic:
topic = binary.rpartition('tricircle-')[2]
if not manager:
manager_cls = ('%s_manager' %
binary.rpartition('tricircle-')[2])
manager = CONF.get(manager_cls, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_enable is None:
periodic_enable = CONF.periodic_enable
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max,
serializer=serializer)
return service_obj
def kill(self):
self.stop()
def stop(self):
try:
self.rpc_server.stop()
except Exception:
pass
try:
self.manager.cleanup_host()
except Exception:
LOG.exception('Service error occurred during cleanup_host')
pass
super(XService, self).stop()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def create_service():
LOG.debug(_('create xjob server'))
xmanager = t_xmanager.XManager()
xservice = XService(
host=CONF.host,
binary="xjob",
topic=topics.TOPIC_XJOB,
manager=xmanager,
periodic_enable=True,
report_interval=_TIMER_INTERVAL,
periodic_interval_max=_TIMER_INTERVAL_MAX,
serializer=t_serializer.TricircleSerializer()
)
xservice.start()
return xservice
_launcher = None
def serve(xservice, workers=1):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = srv.ProcessLauncher(CONF, restart_method='mutate')
_launcher.launch_service(xservice, workers=workers)
def wait():
_launcher.wait()