introduce tacker conductor

This patch enables the message rpc framework which tacker conductor
will use. After started via tacker_conductor script, it will listen
on tacker_conductor, tacker_conductor.<host> and one fanout queues.

These three queues are intialized in oslo service module. And topic
queue tacker_conductor will be used by mistral actions.

How to test it:
  document will be written after the whole vim monitor is ready.
To test this patch, after 'python setup.py develop', run
'tacker-conductor --config-file /etc/tacker/tacker.conf', and then
use 'rabbitmqctl list_queues' which will list three queues starting
with 'tacker_conductor'.

DocImpact
Implements: blueprint refactor-vim-monitor

Change-Id: Ide80674099d384aed1a98b22928e9fc6bcfa3ff9
This commit is contained in:
jing.liuqing 2017-06-08 16:20:19 +08:00
parent 51ede7484a
commit e58b01a545
10 changed files with 515 additions and 41 deletions

View File

@ -37,7 +37,8 @@ setup-hooks =
[entry_points]
console_scripts =
tacker-db-manage = tacker.db.migration.cli:main
tacker-server = tacker.cmd.server:main
tacker-server = tacker.cmd.eventlet.tacker_server:main
tacker-conductor = tacker.cmd.eventlet.conductor:main
tacker-rootwrap = oslo.rootwrap.cmd:main
tacker.service_plugins =
dummy = tacker.tests.unit.dummy_plugin:DummyServicePlugin

View File

@ -0,0 +1,17 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.common import eventlet_utils
eventlet_utils.monkey_patch()

View File

@ -0,0 +1,17 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.conductor import conductor_server
def main():
conductor_server.main()

View File

@ -20,8 +20,6 @@
import sys
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg
import oslo_i18n
from oslo_service import service as common_service
@ -52,7 +50,3 @@ def main():
pass
except RuntimeError as e:
sys.exit(_("ERROR: %s") % e)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,26 @@
# copyright (c) 2015 Cloudbase Solutions.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import eventlet
from oslo_utils import importutils
def monkey_patch():
eventlet.monkey_patch()
if os.name != 'nt':
p_c_e = importutils.import_module('pyroute2.config.eventlet')
p_c_e.eventlet_config()

View File

@ -13,15 +13,26 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import random
import time
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging.rpc import dispatcher
from oslo_messaging import serializer as om_serializer
from oslo_service import service
from oslo_utils import excutils
from tacker.common import exceptions
from tacker import context
LOG = logging.getLogger(__name__)
TRANSPORT = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
@ -30,29 +41,34 @@ ALLOWED_EXMODS = [
EXTRA_EXMODS = []
TRANSPORT_ALIASES = {
'tacker.rpc.impl_fake': 'fake',
'tacker.rpc.impl_qpid': 'qpid',
'tacker.rpc.impl_kombu': 'rabbit',
'tacker.rpc.impl_zmq': 'zmq',
}
# NOTE(salv-orlando): I am afraid this is a global variable. While not ideal,
# they're however widely used throughout the code base. It should be set to
# true if the RPC server is not running in the current process space. This
# will prevent get_connection from creating connections to the AMQP server
RPC_DISABLED = False
def init(conf):
global TRANSPORT, NOTIFIER
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = oslo_messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
NOTIFIER = oslo_messaging.Notifier(TRANSPORT)
allowed_remote_exmods=exmods)
NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport(
conf, allowed_remote_exmods=exmods)
serializer = RequestContextSerializer()
NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer)
def cleanup():
global TRANSPORT, NOTIFIER
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFICATION_TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
NOTIFICATION_TRANSPORT.cleanup()
_ContextWrapper.reset_timeouts()
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
def add_extra_exmods(*args):
@ -67,6 +83,128 @@ def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
def _get_default_method_timeout():
return TRANSPORT.conf.rpc_response_timeout
def _get_default_method_timeouts():
return collections.defaultdict(_get_default_method_timeout)
class _ContextWrapper(object):
"""Wraps oslo messaging contexts to set the timeout for calls.
This intercepts RPC calls and sets the timeout value to the globally
adapting value for each method. An oslo messaging timeout results in
a doubling of the timeout value for the method on which it timed out.
There currently is no logic to reduce the timeout since busy Tacker
servers are more frequently the cause of timeouts rather than lost
messages.
"""
_METHOD_TIMEOUTS = _get_default_method_timeouts()
_max_timeout = None
@classmethod
def reset_timeouts(cls):
# restore the original default timeout factory
cls._METHOD_TIMEOUTS = _get_default_method_timeouts()
cls._max_timeout = None
@classmethod
def get_max_timeout(cls):
return cls._max_timeout or _get_default_method_timeout() * 10
@classmethod
def set_max_timeout(cls, max_timeout):
if max_timeout < cls.get_max_timeout():
cls._METHOD_TIMEOUTS = collections.defaultdict(
lambda: max_timeout, **{
k: min(v, max_timeout)
for k, v in cls._METHOD_TIMEOUTS.items()
})
cls._max_timeout = max_timeout
def __init__(self, original_context):
self._original_context = original_context
def __getattr__(self, name):
return getattr(self._original_context, name)
def call(self, ctxt, method, **kwargs):
# two methods with the same name in different namespaces should
# be tracked independently
if self._original_context.target.namespace:
scoped_method = '%s.%s' % (self._original_context.target.namespace,
method)
else:
scoped_method = method
# set the timeout from the global method timeout tracker for this
# method
self._original_context.timeout = self._METHOD_TIMEOUTS[scoped_method]
try:
return self._original_context.call(ctxt, method, **kwargs)
except oslo_messaging.MessagingTimeout:
with excutils.save_and_reraise_exception():
wait = random.uniform(
0,
min(self._METHOD_TIMEOUTS[scoped_method],
TRANSPORT.conf.rpc_response_timeout)
)
LOG.error("Timeout in RPC method %(method)s. Waiting for "
"%(wait)s seconds before next attempt. If the "
"server is not down, consider increasing the "
"rpc_response_timeout option as message "
"server(s) may be overloaded and unable to "
"respond quickly enough.",
{'wait': int(round(wait)), 'method': scoped_method})
new_timeout = min(
self._original_context.timeout * 2, self.get_max_timeout())
if new_timeout > self._METHOD_TIMEOUTS[scoped_method]:
LOG.warning("Increasing timeout for %(method)s calls "
"to %(new)s seconds. Restart the client to "
"restore it to the default value.",
{'method': scoped_method, 'new': new_timeout})
self._METHOD_TIMEOUTS[scoped_method] = new_timeout
time.sleep(wait)
class BackingOffClient(oslo_messaging.RPCClient):
"""An oslo messaging RPC Client that implements a timeout backoff.
This has all of the same interfaces as oslo_messaging.RPCClient but
if the timeout parameter is not specified, the _ContextWrapper returned
will track when call timeout exceptions occur and exponentially increase
the timeout for the given call method.
"""
def prepare(self, *args, **kwargs):
ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
# don't enclose Contexts that explicitly set a timeout
return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx
@staticmethod
def set_max_timeout(max_timeout):
'''Set RPC timeout ceiling for all backing-off RPC clients.'''
_ContextWrapper.set_max_timeout(max_timeout)
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return BackingOffClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
access_policy = dispatcher.DefaultRPCAccessPolicy
return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
'eventlet', serializer,
access_policy=access_policy)
def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
@ -74,14 +212,11 @@ def get_notifier(service=None, host=None, publisher_id=None):
return NOTIFIER.prepare(publisher_id=publisher_id)
class PluginRpcSerializer(om_serializer.Serializer):
"""Serializer.
class RequestContextSerializer(om_serializer.Serializer):
"""convert RPC common context int tacker Context."""
This serializer is used to convert RPC common context into
Tacker Context.
"""
def __init__(self, base):
super(PluginRpcSerializer, self).__init__()
def __init__(self, base=None):
super(RequestContextSerializer, self).__init__()
self._base = base
def serialize_entity(self, ctxt, entity):
@ -95,15 +230,101 @@ class PluginRpcSerializer(om_serializer.Serializer):
return self._base.deserialize_entity(ctxt, entity)
def serialize_context(self, ctxt):
return ctxt.to_dict()
_context = ctxt.to_dict()
return _context
def deserialize_context(self, ctxt):
rpc_ctxt_dict = ctxt.copy()
user_id = rpc_ctxt_dict.pop('user_id', None)
if not user_id:
user_id = rpc_ctxt_dict.pop('user', None)
tenant_id = rpc_ctxt_dict.pop('tenant_id', None)
if not tenant_id:
tenant_id = rpc_ctxt_dict.pop('project_id', None)
return context.Context(user_id, tenant_id,
load_admin_roles=False, **rpc_ctxt_dict)
return context.Context.from_dict(rpc_ctxt_dict)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None, serializer=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(Service, self).start()
self.conn = create_connection()
LOG.debug("Creating Consumer connection for Service %s",
self.topic)
endpoints = [self.manager]
self.conn.create_consumer(self.topic, endpoints)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in threads
self.conn.consume_in_threads()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
super(Service, self).stop()
class Connection(object):
def __init__(self):
super(Connection, self).__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
target = oslo_messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = get_server(target, endpoints)
self.servers.append(server)
def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
def close(self):
for server in self.servers:
server.stop()
for server in self.servers:
server.wait()
class VoidConnection(object):
def create_consumer(self, topic, endpoints, fanout=False):
pass
def consume_in_threads(self):
pass
def close(self):
pass
# functions
def create_connection():
# NOTE(salv-orlando): This is a clever interpretation of the factory design
# patter aimed at preventing plugins from initializing RPC servers upon
# initialization when they are running in the REST over HTTP API server.
# The educated reader will perfectly be able that this a fairly dirty hack
# to avoid having to change the initialization process of every plugin.
if RPC_DISABLED:
return VoidConnection()
return Connection()

View File

View File

@ -0,0 +1,65 @@
# Copyright 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from tacker import manager
from tacker import service as tacker_service
from tacker import version
LOG = logging.getLogger(__name__)
class Conductor(manager.Manager):
def __init__(self, host, conf=None):
if conf:
self.conf = conf
else:
self.conf = cfg.CONF
super(Conductor, self).__init__(host=self.conf.host)
def register_opts(conf):
pass
def init(args, **kwargs):
cfg.CONF(args=args, project='tacker',
version='%%prog %s' % version.version_info.release_string(),
**kwargs)
# FIXME(ihrachys): if import is put in global, circular import
# failure occurs
from tacker.common import rpc as n_rpc
n_rpc.init(cfg.CONF)
def main(manager='tacker.conductor.conductor_server.Conductor'):
register_opts(cfg.CONF)
init(sys.argv[1:])
logging.setup(cfg.CONF, "tacker")
oslo_messaging.set_transport_defaults(control_exchange='tacker')
logging.setup(cfg.CONF, "tacker")
cfg.CONF.log_opt_values(LOG, logging.DEBUG)
server = tacker_service.Service.create(
binary='tacker-conductor',
topic='tacker_conductor',
manager=manager)
service.launch(cfg.CONF, server).wait()

View File

@ -15,6 +15,7 @@
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import periodic_task
from tacker.common import utils
@ -26,13 +27,14 @@ LOG = logging.getLogger(__name__)
class Manager(periodic_task.PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
target = oslo_messaging.Target(version='1.0')
def __init__(self, host=None):
if not host:
host = cfg.CONF.host
self.host = host
super(Manager, self).__init__()
conf = getattr(self, "conf", cfg.CONF)
super(Manager, self).__init__(conf)
def periodic_tasks(self, context, raise_on_error=False):
self.run_periodic_tasks(context, raise_on_error=raise_on_error)

View File

@ -13,18 +13,29 @@
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import os
import random
import logging as std_logging
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_service import service
from oslo_utils import excutils
from oslo_utils import importutils
from tacker.common import config
from tacker.common import rpc as n_rpc
from tacker import context
from tacker import wsgi
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help=_('Seconds between running components report states')),
cfg.IntOpt('periodic_interval',
default=40,
help=_('Seconds between running periodic tasks')),
@ -95,8 +106,8 @@ def serve_wsgi(cls):
service = cls.create()
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_('Unrecoverable error: please check log '
'for details.'))
LOG.exception('Unrecoverable error: please check log '
'for details.')
return service
@ -104,14 +115,134 @@ def serve_wsgi(cls):
def _run_wsgi(app_name):
app = config.load_paste_app(app_name)
if not app:
LOG.error(_('No known API applications configured.'))
LOG.error('No known API applications configured.')
return
server = wsgi.Server("Tacker")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=cfg.CONF.api_workers)
# Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info(_("Tacker service started, listening on %(host)s:%(port)s"),
LOG.info("Tacker service started, listening on %(host)s:%(port)s",
{'host': cfg.CONF.bind_host,
'port': cfg.CONF.bind_port})
return server
class Service(n_rpc.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager.
"""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
*args, **kwargs):
self.binary = binary
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
super(Service, self).__init__(host, topic, manager=self.manager)
def start(self):
self.manager.init_host()
super(Service, self).start()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
self.manager.after_start()
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None):
"""Instantiates class and passes back application object.
:param host: defaults to cfg.CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'tacker-' part
:param manager: defaults to cfg.CONF.<topic>_manager
:param report_interval: defaults to cfg.CONF.report_interval
:param periodic_interval: defaults to cfg.CONF.periodic_interval
:param periodic_fuzzy_delay: defaults to cfg.CONF.periodic_fuzzy_delay
"""
if not host:
host = cfg.CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary.rpartition('neutron-')[2]
topic = topic.replace("-", "_")
if not manager:
manager = cfg.CONF.get('%s_manager' % topic, None)
if report_interval is None:
report_interval = cfg.CONF.report_interval
if periodic_interval is None:
periodic_interval = cfg.CONF.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = cfg.CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay)
return service_obj
def kill(self):
"""Destroy the service object."""
self.stop()
def stop(self):
super(Service, self).stop()
for x in self.timers:
try:
x.stop()
except Exception:
LOG.exception("Exception occurs when timer stops")
self.timers = []
def wait(self):
super(Service, self).wait()
for x in self.timers:
try:
x.wait()
except Exception:
LOG.exception("Exception occurs when waiting for timer")
def reset(self):
config.reset_service()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def report_state(self):
"""Update the state of this service."""
# Todo(gongysh) report state to neutron server
pass