diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index 6b5ac5ac715..67f855bd4b3 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -21,6 +21,7 @@ import eventlet from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall from oslo_utils import importutils from neutron.agent.linux import dhcp @@ -36,7 +37,6 @@ from neutron.common import utils from neutron import context from neutron.i18n import _LE, _LI, _LW from neutron import manager -from neutron.openstack.common import loopingcall LOG = logging.getLogger(__name__) diff --git a/neutron/agent/dhcp_agent.py b/neutron/agent/dhcp_agent.py index 8b7bbae31ad..845259a2d5f 100644 --- a/neutron/agent/dhcp_agent.py +++ b/neutron/agent/dhcp_agent.py @@ -17,6 +17,7 @@ import sys from oslo_config import cfg +from oslo_service import service from neutron.agent.common import config from neutron.agent.dhcp import config as dhcp_config @@ -24,7 +25,6 @@ from neutron.agent.linux import interface from neutron.agent.metadata import config as metadata_config from neutron.common import config as common_config from neutron.common import topics -from neutron.openstack.common import service from neutron import service as neutron_service @@ -49,4 +49,4 @@ def main(): topic=topics.DHCP_AGENT, report_interval=cfg.CONF.AGENT.report_interval, manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport') - service.launch(server).wait() + service.launch(cfg.CONF, server).wait() diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index a1aec148843..71f64ccca3e 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -18,6 +18,8 @@ import netaddr from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall +from oslo_service import periodic_task from oslo_utils import excutils from oslo_utils import importutils from oslo_utils import timeutils @@ -47,8 +49,6 @@ from neutron.common import topics from neutron import context as n_context from neutron.i18n import _LE, _LI, _LW from neutron import manager -from neutron.openstack.common import loopingcall -from neutron.openstack.common import periodic_task try: from neutron_fwaas.services.firewall.agents.l3reference \ diff --git a/neutron/agent/l3_agent.py b/neutron/agent/l3_agent.py index 12c152d0536..bee060181c9 100644 --- a/neutron/agent/l3_agent.py +++ b/neutron/agent/l3_agent.py @@ -17,6 +17,7 @@ import sys from oslo_config import cfg +from oslo_service import service from neutron.agent.common import config from neutron.agent.l3 import config as l3_config @@ -26,7 +27,6 @@ from neutron.agent.linux import interface from neutron.agent.metadata import config as metadata_config from neutron.common import config as common_config from neutron.common import topics -from neutron.openstack.common import service from neutron import service as neutron_service @@ -51,4 +51,4 @@ def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'): topic=topics.L3_AGENT, report_interval=cfg.CONF.AGENT.report_interval, manager=manager) - service.launch(server).wait() + service.launch(cfg.CONF, server).wait() diff --git a/neutron/agent/metadata/agent.py b/neutron/agent/metadata/agent.py index 769d8039bc0..ee5b169670b 100644 --- a/neutron/agent/metadata/agent.py +++ b/neutron/agent/metadata/agent.py @@ -20,6 +20,7 @@ from neutronclient.v2_0 import client from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall import six import six.moves.urllib.parse as urlparse import webob @@ -34,7 +35,6 @@ from neutron.common import utils from neutron import context from neutron.i18n import _LE, _LW from neutron.openstack.common.cache import cache -from neutron.openstack.common import loopingcall LOG = logging.getLogger(__name__) diff --git a/neutron/common/rpc.py b/neutron/common/rpc.py index 8c4df963fbc..6fe39842b7e 100644 --- a/neutron/common/rpc.py +++ b/neutron/common/rpc.py @@ -18,10 +18,10 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging from oslo_messaging import serializer as om_serializer +from oslo_service import service from neutron.common import exceptions from neutron import context -from neutron.openstack.common import service LOG = logging.getLogger(__name__) diff --git a/neutron/db/agentschedulers_db.py b/neutron/db/agentschedulers_db.py index b9d9c11dbe5..3b682f1e325 100644 --- a/neutron/db/agentschedulers_db.py +++ b/neutron/db/agentschedulers_db.py @@ -19,6 +19,7 @@ import time from oslo_config import cfg from oslo_log import log as logging +from oslo_service import loopingcall from oslo_utils import timeutils import sqlalchemy as sa from sqlalchemy import orm @@ -32,7 +33,6 @@ from neutron.db import model_base from neutron.extensions import agent as ext_agent from neutron.extensions import dhcpagentscheduler from neutron.i18n import _LE, _LI, _LW -from neutron.openstack.common import loopingcall LOG = logging.getLogger(__name__) diff --git a/neutron/manager.py b/neutron/manager.py index fe46a5aea75..50beae09868 100644 --- a/neutron/manager.py +++ b/neutron/manager.py @@ -18,12 +18,12 @@ import weakref from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import periodic_task from oslo_utils import importutils import six from neutron.common import utils from neutron.i18n import _LE, _LI -from neutron.openstack.common import periodic_task from neutron.plugins.common import constants from stevedore import driver @@ -43,7 +43,8 @@ class Manager(periodic_task.PeriodicTasks): if not host: host = cfg.CONF.host self.host = host - super(Manager, self).__init__() + conf = getattr(self, "conf", cfg.CONF) + super(Manager, self).__init__(conf) def periodic_tasks(self, context, raise_on_error=False): self.run_periodic_tasks(context, raise_on_error=raise_on_error) diff --git a/neutron/openstack/common/eventlet_backdoor.py b/neutron/openstack/common/eventlet_backdoor.py deleted file mode 100644 index 113500339ed..00000000000 --- a/neutron/openstack/common/eventlet_backdoor.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (c) 2012 OpenStack Foundation. -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from __future__ import print_function - -import copy -import errno -import gc -import logging -import os -import pprint -import socket -import sys -import traceback - -import eventlet.backdoor -import greenlet -from oslo_config import cfg - -from neutron.openstack.common._i18n import _LI - -help_for_backdoor_port = ( - "Acceptable values are 0, , and :, where 0 results " - "in listening on a random tcp port number; results in listening " - "on the specified port number (and not enabling backdoor if that port " - "is in use); and : results in listening on the smallest " - "unused port number within the specified range of port numbers. The " - "chosen port is displayed in the service's log file.") -eventlet_backdoor_opts = [ - cfg.StrOpt('backdoor_port', - help="Enable eventlet backdoor. %s" % help_for_backdoor_port) -] - -CONF = cfg.CONF -CONF.register_opts(eventlet_backdoor_opts) -LOG = logging.getLogger(__name__) - - -def list_opts(): - """Entry point for oslo-config-generator. - """ - return [(None, copy.deepcopy(eventlet_backdoor_opts))] - - -class EventletBackdoorConfigValueError(Exception): - def __init__(self, port_range, help_msg, ex): - msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' - '%(help)s' % - {'range': port_range, 'ex': ex, 'help': help_msg}) - super(EventletBackdoorConfigValueError, self).__init__(msg) - self.port_range = port_range - - -def _dont_use_this(): - print("Don't use this, just disconnect instead") - - -def _find_objects(t): - return [o for o in gc.get_objects() if isinstance(o, t)] - - -def _print_greenthreads(): - for i, gt in enumerate(_find_objects(greenlet.greenlet)): - print(i, gt) - traceback.print_stack(gt.gr_frame) - print() - - -def _print_nativethreads(): - for threadId, stack in sys._current_frames().items(): - print(threadId) - traceback.print_stack(stack) - print() - - -def _parse_port_range(port_range): - if ':' not in port_range: - start, end = port_range, port_range - else: - start, end = port_range.split(':', 1) - try: - start, end = int(start), int(end) - if end < start: - raise ValueError - return start, end - except ValueError as ex: - raise EventletBackdoorConfigValueError(port_range, ex, - help_for_backdoor_port) - - -def _listen(host, start_port, end_port, listen_func): - try_port = start_port - while True: - try: - return listen_func((host, try_port)) - except socket.error as exc: - if (exc.errno != errno.EADDRINUSE or - try_port >= end_port): - raise - try_port += 1 - - -def initialize_if_enabled(): - backdoor_locals = { - 'exit': _dont_use_this, # So we don't exit the entire process - 'quit': _dont_use_this, # So we don't exit the entire process - 'fo': _find_objects, - 'pgt': _print_greenthreads, - 'pnt': _print_nativethreads, - } - - if CONF.backdoor_port is None: - return None - - start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) - - # NOTE(johannes): The standard sys.displayhook will print the value of - # the last expression and set it to __builtin__._, which overwrites - # the __builtin__._ that gettext sets. Let's switch to using pprint - # since it won't interact poorly with gettext, and it's easier to - # read the output too. - def displayhook(val): - if val is not None: - pprint.pprint(val) - sys.displayhook = displayhook - - sock = _listen('localhost', start_port, end_port, eventlet.listen) - - # In the case of backdoor port being zero, a port number is assigned by - # listen(). In any case, pull the port number out here. - port = sock.getsockname()[1] - LOG.info( - _LI('Eventlet backdoor listening on %(port)s for process %(pid)d'), - {'port': port, 'pid': os.getpid()} - ) - eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, - locals=backdoor_locals) - return port diff --git a/neutron/openstack/common/loopingcall.py b/neutron/openstack/common/loopingcall.py deleted file mode 100644 index ab28ca1c8e7..00000000000 --- a/neutron/openstack/common/loopingcall.py +++ /dev/null @@ -1,147 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# Copyright 2011 Justin Santa Barbara -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import sys -import time - -from eventlet import event -from eventlet import greenthread - -from neutron.openstack.common._i18n import _LE, _LW - -LOG = logging.getLogger(__name__) - -# NOTE(zyluo): This lambda function was declared to avoid mocking collisions -# with time.time() called in the standard logging module -# during unittests. -_ts = lambda: time.time() - - -class LoopingCallDone(Exception): - """Exception to break out and stop a LoopingCallBase. - - The poll-function passed to LoopingCallBase can raise this exception to - break out of the loop normally. This is somewhat analogous to - StopIteration. - - An optional return-value can be included as the argument to the exception; - this return-value will be returned by LoopingCallBase.wait() - - """ - - def __init__(self, retvalue=True): - """:param retvalue: Value that LoopingCallBase.wait() should return.""" - self.retvalue = retvalue - - -class LoopingCallBase(object): - def __init__(self, f=None, *args, **kw): - self.args = args - self.kw = kw - self.f = f - self._running = False - self.done = None - - def stop(self): - self._running = False - - def wait(self): - return self.done.wait() - - -class FixedIntervalLoopingCall(LoopingCallBase): - """A fixed interval looping call.""" - - def start(self, interval, initial_delay=None): - self._running = True - done = event.Event() - - def _inner(): - if initial_delay: - greenthread.sleep(initial_delay) - - try: - while self._running: - start = _ts() - self.f(*self.args, **self.kw) - end = _ts() - if not self._running: - break - delay = end - start - interval - if delay > 0: - LOG.warning(_LW('task %(func_name)r run outlasted ' - 'interval by %(delay).2f sec'), - {'func_name': self.f, 'delay': delay}) - greenthread.sleep(-delay if delay < 0 else 0) - except LoopingCallDone as e: - self.stop() - done.send(e.retvalue) - except Exception: - LOG.exception(_LE('in fixed duration looping call')) - done.send_exception(*sys.exc_info()) - return - else: - done.send(True) - - self.done = done - - greenthread.spawn_n(_inner) - return self.done - - -class DynamicLoopingCall(LoopingCallBase): - """A looping call which sleeps until the next known event. - - The function called should return how long to sleep for before being - called again. - """ - - def start(self, initial_delay=None, periodic_interval_max=None): - self._running = True - done = event.Event() - - def _inner(): - if initial_delay: - greenthread.sleep(initial_delay) - - try: - while self._running: - idle = self.f(*self.args, **self.kw) - if not self._running: - break - - if periodic_interval_max is not None: - idle = min(idle, periodic_interval_max) - LOG.debug('Dynamic looping call %(func_name)r sleeping ' - 'for %(idle).02f seconds', - {'func_name': self.f, 'idle': idle}) - greenthread.sleep(idle) - except LoopingCallDone as e: - self.stop() - done.send(e.retvalue) - except Exception: - LOG.exception(_LE('in dynamic looping call')) - done.send_exception(*sys.exc_info()) - return - else: - done.send(True) - - self.done = done - - greenthread.spawn(_inner) - return self.done diff --git a/neutron/openstack/common/periodic_task.py b/neutron/openstack/common/periodic_task.py deleted file mode 100644 index 633a1467116..00000000000 --- a/neutron/openstack/common/periodic_task.py +++ /dev/null @@ -1,232 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import logging -import random -import time - -from oslo_config import cfg -import six - -from neutron.openstack.common._i18n import _, _LE, _LI - - -periodic_opts = [ - cfg.BoolOpt('run_external_periodic_tasks', - default=True, - help='Some periodic tasks can be run in a separate process. ' - 'Should we run them here?'), -] - -CONF = cfg.CONF -CONF.register_opts(periodic_opts) - -LOG = logging.getLogger(__name__) - -DEFAULT_INTERVAL = 60.0 - - -def list_opts(): - """Entry point for oslo-config-generator.""" - return [(None, copy.deepcopy(periodic_opts))] - - -class InvalidPeriodicTaskArg(Exception): - message = _("Unexpected argument for periodic task creation: %(arg)s.") - - -def periodic_task(*args, **kwargs): - """Decorator to indicate that a method is a periodic task. - - This decorator can be used in two ways: - - 1. Without arguments '@periodic_task', this will be run on the default - interval of 60 seconds. - - 2. With arguments: - @periodic_task(spacing=N [, run_immediately=[True|False]] - [, name=[None|"string"]) - this will be run on approximately every N seconds. If this number is - negative the periodic task will be disabled. If the run_immediately - argument is provided and has a value of 'True', the first run of the - task will be shortly after task scheduler starts. If - run_immediately is omitted or set to 'False', the first time the - task runs will be approximately N seconds after the task scheduler - starts. If name is not provided, __name__ of function is used. - """ - def decorator(f): - # Test for old style invocation - if 'ticks_between_runs' in kwargs: - raise InvalidPeriodicTaskArg(arg='ticks_between_runs') - - # Control if run at all - f._periodic_task = True - f._periodic_external_ok = kwargs.pop('external_process_ok', False) - if f._periodic_external_ok and not CONF.run_external_periodic_tasks: - f._periodic_enabled = False - else: - f._periodic_enabled = kwargs.pop('enabled', True) - f._periodic_name = kwargs.pop('name', f.__name__) - - # Control frequency - f._periodic_spacing = kwargs.pop('spacing', 0) - f._periodic_immediate = kwargs.pop('run_immediately', False) - if f._periodic_immediate: - f._periodic_last_run = None - else: - f._periodic_last_run = time.time() - return f - - # NOTE(sirp): The `if` is necessary to allow the decorator to be used with - # and without parenthesis. - # - # In the 'with-parenthesis' case (with kwargs present), this function needs - # to return a decorator function since the interpreter will invoke it like: - # - # periodic_task(*args, **kwargs)(f) - # - # In the 'without-parenthesis' case, the original function will be passed - # in as the first argument, like: - # - # periodic_task(f) - if kwargs: - return decorator - else: - return decorator(args[0]) - - -class _PeriodicTasksMeta(type): - def _add_periodic_task(cls, task): - """Add a periodic task to the list of periodic tasks. - - The task should already be decorated by @periodic_task. - - :return: whether task was actually enabled - """ - name = task._periodic_name - - if task._periodic_spacing < 0: - LOG.info(_LI('Skipping periodic task %(task)s because ' - 'its interval is negative'), - {'task': name}) - return False - if not task._periodic_enabled: - LOG.info(_LI('Skipping periodic task %(task)s because ' - 'it is disabled'), - {'task': name}) - return False - - # A periodic spacing of zero indicates that this task should - # be run on the default interval to avoid running too - # frequently. - if task._periodic_spacing == 0: - task._periodic_spacing = DEFAULT_INTERVAL - - cls._periodic_tasks.append((name, task)) - cls._periodic_spacing[name] = task._periodic_spacing - return True - - def __init__(cls, names, bases, dict_): - """Metaclass that allows us to collect decorated periodic tasks.""" - super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) - - # NOTE(sirp): if the attribute is not present then we must be the base - # class, so, go ahead an initialize it. If the attribute is present, - # then we're a subclass so make a copy of it so we don't step on our - # parent's toes. - try: - cls._periodic_tasks = cls._periodic_tasks[:] - except AttributeError: - cls._periodic_tasks = [] - - try: - cls._periodic_spacing = cls._periodic_spacing.copy() - except AttributeError: - cls._periodic_spacing = {} - - for value in cls.__dict__.values(): - if getattr(value, '_periodic_task', False): - cls._add_periodic_task(value) - - -def _nearest_boundary(last_run, spacing): - """Find nearest boundary which is in the past, which is a multiple of the - spacing with the last run as an offset. - - Eg if last run was 10 and spacing was 7, the new last run could be: 17, 24, - 31, 38... - - 0% to 5% of the spacing value will be added to this value to ensure tasks - do not synchronize. This jitter is rounded to the nearest second, this - means that spacings smaller than 20 seconds will not have jitter. - """ - current_time = time.time() - if last_run is None: - return current_time - delta = current_time - last_run - offset = delta % spacing - # Add up to 5% jitter - jitter = int(spacing * (random.random() / 20)) - return current_time - offset + jitter - - -@six.add_metaclass(_PeriodicTasksMeta) -class PeriodicTasks(object): - def __init__(self): - super(PeriodicTasks, self).__init__() - self._periodic_last_run = {} - for name, task in self._periodic_tasks: - self._periodic_last_run[name] = task._periodic_last_run - - def add_periodic_task(self, task): - """Add a periodic task to the list of periodic tasks. - - The task should already be decorated by @periodic_task. - """ - if self.__class__._add_periodic_task(task): - self._periodic_last_run[task._periodic_name] = ( - task._periodic_last_run) - - def run_periodic_tasks(self, context, raise_on_error=False): - """Tasks to be run at a periodic interval.""" - idle_for = DEFAULT_INTERVAL - for task_name, task in self._periodic_tasks: - full_task_name = '.'.join([self.__class__.__name__, task_name]) - - spacing = self._periodic_spacing[task_name] - last_run = self._periodic_last_run[task_name] - - # Check if due, if not skip - idle_for = min(idle_for, spacing) - if last_run is not None: - delta = last_run + spacing - time.time() - if delta > 0: - idle_for = min(idle_for, delta) - continue - - LOG.debug("Running periodic task %(full_task_name)s", - {"full_task_name": full_task_name}) - self._periodic_last_run[task_name] = _nearest_boundary( - last_run, spacing) - - try: - task(self, context) - except Exception: - if raise_on_error: - raise - LOG.exception(_LE("Error during %(full_task_name)s"), - {"full_task_name": full_task_name}) - time.sleep(0) - - return idle_for diff --git a/neutron/openstack/common/service.py b/neutron/openstack/common/service.py deleted file mode 100644 index b757c008e22..00000000000 --- a/neutron/openstack/common/service.py +++ /dev/null @@ -1,507 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# Copyright 2011 Justin Santa Barbara -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Generic Node base class for all workers that run on hosts.""" - -import errno -import io -import logging -import os -import random -import signal -import sys -import time - -import eventlet -from eventlet import event -from oslo_config import cfg - -from neutron.openstack.common import eventlet_backdoor -from neutron.openstack.common._i18n import _LE, _LI, _LW -from neutron.openstack.common import systemd -from neutron.openstack.common import threadgroup - - -CONF = cfg.CONF -LOG = logging.getLogger(__name__) - - -def _sighup_supported(): - return hasattr(signal, 'SIGHUP') - - -def _is_daemon(): - # The process group for a foreground process will match the - # process group of the controlling terminal. If those values do - # not match, or ioctl() fails on the stdout file handle, we assume - # the process is running in the background as a daemon. - # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics - try: - is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) - except io.UnsupportedOperation: - # Could not get the fileno for stdout, so we must be a daemon. - is_daemon = True - except OSError as err: - if err.errno == errno.ENOTTY: - # Assume we are a daemon because there is no terminal. - is_daemon = True - else: - raise - return is_daemon - - -def _is_sighup_and_daemon(signo): - if not (_sighup_supported() and signo == signal.SIGHUP): - # Avoid checking if we are a daemon, because the signal isn't - # SIGHUP. - return False - return _is_daemon() - - -def _signo_to_signame(signo): - signals = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'} - if _sighup_supported(): - signals[signal.SIGHUP] = 'SIGHUP' - return signals[signo] - - -def _set_signals_handler(handler): - signal.signal(signal.SIGTERM, handler) - signal.signal(signal.SIGINT, handler) - if _sighup_supported(): - signal.signal(signal.SIGHUP, handler) - - -class Launcher(object): - """Launch one or more services and wait for them to complete.""" - - def __init__(self): - """Initialize the service launcher. - - :returns: None - - """ - self.services = Services() - self.backdoor_port = eventlet_backdoor.initialize_if_enabled() - - def launch_service(self, service): - """Load and start the given service. - - :param service: The service you would like to start. - :returns: None - - """ - service.backdoor_port = self.backdoor_port - self.services.add(service) - - def stop(self): - """Stop all services which are currently running. - - :returns: None - - """ - self.services.stop() - - def wait(self): - """Waits until all services have been stopped, and then returns. - - :returns: None - - """ - self.services.wait() - - def restart(self): - """Reload config files and restart service. - - :returns: None - - """ - cfg.CONF.reload_config_files() - self.services.restart() - - -class SignalExit(SystemExit): - def __init__(self, signo, exccode=1): - super(SignalExit, self).__init__(exccode) - self.signo = signo - - -class ServiceLauncher(Launcher): - def _handle_signal(self, signo, frame): - # Allow the process to be killed again and die from natural causes - _set_signals_handler(signal.SIG_DFL) - raise SignalExit(signo) - - def handle_signal(self): - _set_signals_handler(self._handle_signal) - - def _wait_for_exit_or_signal(self, ready_callback=None): - status = None - signo = 0 - - LOG.debug('Full set of CONF:') - CONF.log_opt_values(LOG, logging.DEBUG) - - try: - if ready_callback: - ready_callback() - super(ServiceLauncher, self).wait() - except SignalExit as exc: - signame = _signo_to_signame(exc.signo) - LOG.info(_LI('Caught %s, exiting'), signame) - status = exc.code - signo = exc.signo - except SystemExit as exc: - status = exc.code - finally: - self.stop() - - return status, signo - - def wait(self, ready_callback=None): - systemd.notify_once() - while True: - self.handle_signal() - status, signo = self._wait_for_exit_or_signal(ready_callback) - if not _is_sighup_and_daemon(signo): - return status - self.restart() - - -class ServiceWrapper(object): - def __init__(self, service, workers): - self.service = service - self.workers = workers - self.children = set() - self.forktimes = [] - - -class ProcessLauncher(object): - _signal_handlers_set = set() - - @classmethod - def _handle_class_signals(cls, *args, **kwargs): - for handler in cls._signal_handlers_set: - handler(*args, **kwargs) - - def __init__(self, wait_interval=0.01): - """Constructor. - - :param wait_interval: The interval to sleep for between checks - of child process exit. - """ - self.children = {} - self.sigcaught = None - self.running = True - self.wait_interval = wait_interval - rfd, self.writepipe = os.pipe() - self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') - self.handle_signal() - - def handle_signal(self): - self._signal_handlers_set.add(self._handle_signal) - _set_signals_handler(self._handle_class_signals) - - def _handle_signal(self, signo, frame): - self.sigcaught = signo - self.running = False - - # Allow the process to be killed again and die from natural causes - _set_signals_handler(signal.SIG_DFL) - - def _pipe_watcher(self): - # This will block until the write end is closed when the parent - # dies unexpectedly - self.readpipe.read(1) - - LOG.info(_LI('Parent process has died unexpectedly, exiting')) - - sys.exit(1) - - def _child_process_handle_signal(self): - # Setup child signal handlers differently - def _sighup(*args): - signal.signal(signal.SIGHUP, signal.SIG_DFL) - raise SignalExit(signal.SIGHUP) - - # Parent signals with SIGTERM when it wants us to go away. - signal.signal(signal.SIGTERM, signal.SIG_DFL) - if _sighup_supported(): - signal.signal(signal.SIGHUP, _sighup) - # Block SIGINT and let the parent send us a SIGTERM - signal.signal(signal.SIGINT, signal.SIG_IGN) - - def _child_wait_for_exit_or_signal(self, launcher): - status = 0 - signo = 0 - - # NOTE(johannes): All exceptions are caught to ensure this - # doesn't fallback into the loop spawning children. It would - # be bad for a child to spawn more children. - try: - launcher.wait() - except SignalExit as exc: - signame = _signo_to_signame(exc.signo) - LOG.info(_LI('Child caught %s, exiting'), signame) - status = exc.code - signo = exc.signo - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_LE('Unhandled exception')) - status = 2 - finally: - launcher.stop() - - return status, signo - - def _child_process(self, service): - self._child_process_handle_signal() - - # Reopen the eventlet hub to make sure we don't share an epoll - # fd with parent and/or siblings, which would be bad - eventlet.hubs.use_hub() - - # Close write to ensure only parent has it open - os.close(self.writepipe) - # Create greenthread to watch for parent to close pipe - eventlet.spawn_n(self._pipe_watcher) - - # Reseed random number generator - random.seed() - - launcher = Launcher() - launcher.launch_service(service) - return launcher - - def _start_child(self, wrap): - if len(wrap.forktimes) > wrap.workers: - # Limit ourselves to one process a second (over the period of - # number of workers * 1 second). This will allow workers to - # start up quickly but ensure we don't fork off children that - # die instantly too quickly. - if time.time() - wrap.forktimes[0] < wrap.workers: - LOG.info(_LI('Forking too fast, sleeping')) - time.sleep(1) - - wrap.forktimes.pop(0) - - wrap.forktimes.append(time.time()) - - pid = os.fork() - if pid == 0: - launcher = self._child_process(wrap.service) - while True: - self._child_process_handle_signal() - status, signo = self._child_wait_for_exit_or_signal(launcher) - if not _is_sighup_and_daemon(signo): - break - launcher.restart() - - os._exit(status) - - LOG.info(_LI('Started child %d'), pid) - - wrap.children.add(pid) - self.children[pid] = wrap - - return pid - - def launch_service(self, service, workers=1): - wrap = ServiceWrapper(service, workers) - - LOG.info(_LI('Starting %d workers'), wrap.workers) - while self.running and len(wrap.children) < wrap.workers: - self._start_child(wrap) - - def _wait_child(self): - try: - # Don't block if no child processes have exited - pid, status = os.waitpid(0, os.WNOHANG) - if not pid: - return None - except OSError as exc: - if exc.errno not in (errno.EINTR, errno.ECHILD): - raise - return None - - if os.WIFSIGNALED(status): - sig = os.WTERMSIG(status) - LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'), - dict(pid=pid, sig=sig)) - else: - code = os.WEXITSTATUS(status) - LOG.info(_LI('Child %(pid)s exited with status %(code)d'), - dict(pid=pid, code=code)) - - if pid not in self.children: - LOG.warning(_LW('pid %d not in child list'), pid) - return None - - wrap = self.children.pop(pid) - wrap.children.remove(pid) - return wrap - - def _respawn_children(self): - while self.running: - wrap = self._wait_child() - if not wrap: - # Yield to other threads if no children have exited - # Sleep for a short time to avoid excessive CPU usage - # (see bug #1095346) - eventlet.greenthread.sleep(self.wait_interval) - continue - while self.running and len(wrap.children) < wrap.workers: - self._start_child(wrap) - - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - - systemd.notify_once() - LOG.debug('Full set of CONF:') - CONF.log_opt_values(LOG, logging.DEBUG) - - try: - while True: - self.handle_signal() - self._respawn_children() - # No signal means that stop was called. Don't clean up here. - if not self.sigcaught: - return - - signame = _signo_to_signame(self.sigcaught) - LOG.info(_LI('Caught %s, stopping children'), signame) - if not _is_sighup_and_daemon(self.sigcaught): - break - - cfg.CONF.reload_config_files() - for service in set( - [wrap.service for wrap in self.children.values()]): - service.reset() - - for pid in self.children: - os.kill(pid, signal.SIGHUP) - - self.running = True - self.sigcaught = None - except eventlet.greenlet.GreenletExit: - LOG.info(_LI("Wait called after thread killed. Cleaning up.")) - - self.stop() - - def stop(self): - """Terminate child processes and wait on each.""" - self.running = False - for pid in self.children: - try: - os.kill(pid, signal.SIGTERM) - except OSError as exc: - if exc.errno != errno.ESRCH: - raise - - # Wait for children to die - if self.children: - LOG.info(_LI('Waiting on %d children to exit'), len(self.children)) - while self.children: - self._wait_child() - - -class Service(object): - """Service object for binaries running on hosts.""" - - def __init__(self, threads=1000): - self.tg = threadgroup.ThreadGroup(threads) - - # signal that the service is done shutting itself down: - self._done = event.Event() - - def reset(self): - # NOTE(Fengqian): docs for Event.reset() recommend against using it - self._done = event.Event() - - def start(self): - pass - - def stop(self, graceful=False): - self.tg.stop(graceful) - self.tg.wait() - # Signal that service cleanup is done: - if not self._done.ready(): - self._done.send() - - def wait(self): - self._done.wait() - - -class Services(object): - - def __init__(self): - self.services = [] - self.tg = threadgroup.ThreadGroup() - self.done = event.Event() - - def add(self, service): - self.services.append(service) - self.tg.add_thread(self.run_service, service, self.done) - - def stop(self): - # wait for graceful shutdown of services: - for service in self.services: - service.stop() - service.wait() - - # Each service has performed cleanup, now signal that the run_service - # wrapper threads can now die: - if not self.done.ready(): - self.done.send() - - # reap threads: - self.tg.stop() - - def wait(self): - self.tg.wait() - - def restart(self): - self.stop() - self.done = event.Event() - for restart_service in self.services: - restart_service.reset() - self.tg.add_thread(self.run_service, restart_service, self.done) - - @staticmethod - def run_service(service, done): - """Service start wrapper. - - :param service: service to run - :param done: event to wait on until a shutdown is triggered - :returns: None - - """ - service.start() - done.wait() - - -def launch(service, workers=1): - if workers is None or workers == 1: - launcher = ServiceLauncher() - launcher.launch_service(service) - else: - launcher = ProcessLauncher() - launcher.launch_service(service, workers=workers) - - return launcher diff --git a/neutron/openstack/common/systemd.py b/neutron/openstack/common/systemd.py deleted file mode 100644 index 36243b342ab..00000000000 --- a/neutron/openstack/common/systemd.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright 2012-2014 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Helper module for systemd service readiness notification. -""" - -import logging -import os -import socket -import sys - - -LOG = logging.getLogger(__name__) - - -def _abstractify(socket_name): - if socket_name.startswith('@'): - # abstract namespace socket - socket_name = '\0%s' % socket_name[1:] - return socket_name - - -def _sd_notify(unset_env, msg): - notify_socket = os.getenv('NOTIFY_SOCKET') - if notify_socket: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - try: - sock.connect(_abstractify(notify_socket)) - sock.sendall(msg) - if unset_env: - del os.environ['NOTIFY_SOCKET'] - except EnvironmentError: - LOG.debug("Systemd notification failed", exc_info=True) - finally: - sock.close() - - -def notify(): - """Send notification to Systemd that service is ready. - - For details see - http://www.freedesktop.org/software/systemd/man/sd_notify.html - """ - _sd_notify(False, 'READY=1') - - -def notify_once(): - """Send notification once to Systemd that service is ready. - - Systemd sets NOTIFY_SOCKET environment variable with the name of the - socket listening for notifications from services. - This method removes the NOTIFY_SOCKET environment variable to ensure - notification is sent only once. - """ - _sd_notify(True, 'READY=1') - - -def onready(notify_socket, timeout): - """Wait for systemd style notification on the socket. - - :param notify_socket: local socket address - :type notify_socket: string - :param timeout: socket timeout - :type timeout: float - :returns: 0 service ready - 1 service not ready - 2 timeout occurred - """ - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - sock.settimeout(timeout) - sock.bind(_abstractify(notify_socket)) - try: - msg = sock.recv(512) - except socket.timeout: - return 2 - finally: - sock.close() - if 'READY=1' in msg: - return 0 - else: - return 1 - - -if __name__ == '__main__': - # simple CLI for testing - if len(sys.argv) == 1: - notify() - elif len(sys.argv) >= 2: - timeout = float(sys.argv[1]) - notify_socket = os.getenv('NOTIFY_SOCKET') - if notify_socket: - retval = onready(notify_socket, timeout) - sys.exit(retval) diff --git a/neutron/openstack/common/threadgroup.py b/neutron/openstack/common/threadgroup.py deleted file mode 100644 index a3f74cabda7..00000000000 --- a/neutron/openstack/common/threadgroup.py +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import logging -import threading - -import eventlet -from eventlet import greenpool - -from neutron.openstack.common._i18n import _LE -from neutron.openstack.common import loopingcall - - -LOG = logging.getLogger(__name__) - - -def _thread_done(gt, *args, **kwargs): - """Callback function to be passed to GreenThread.link() when we spawn() - Calls the :class:`ThreadGroup` to notify if. - - """ - kwargs['group'].thread_done(kwargs['thread']) - - -class Thread(object): - """Wrapper around a greenthread, that holds a reference to the - :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when - it has done so it can be removed from the threads list. - """ - def __init__(self, thread, group): - self.thread = thread - self.thread.link(_thread_done, group=group, thread=self) - - def stop(self): - self.thread.kill() - - def wait(self): - return self.thread.wait() - - def link(self, func, *args, **kwargs): - self.thread.link(func, *args, **kwargs) - - -class ThreadGroup(object): - """The point of the ThreadGroup class is to: - - * keep track of timers and greenthreads (making it easier to stop them - when need be). - * provide an easy API to add timers. - """ - def __init__(self, thread_pool_size=10): - self.pool = greenpool.GreenPool(thread_pool_size) - self.threads = [] - self.timers = [] - - def add_dynamic_timer(self, callback, initial_delay=None, - periodic_interval_max=None, *args, **kwargs): - timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) - timer.start(initial_delay=initial_delay, - periodic_interval_max=periodic_interval_max) - self.timers.append(timer) - - def add_timer(self, interval, callback, initial_delay=None, - *args, **kwargs): - pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) - pulse.start(interval=interval, - initial_delay=initial_delay) - self.timers.append(pulse) - - def add_thread(self, callback, *args, **kwargs): - gt = self.pool.spawn(callback, *args, **kwargs) - th = Thread(gt, self) - self.threads.append(th) - return th - - def thread_done(self, thread): - self.threads.remove(thread) - - def _stop_threads(self): - current = threading.current_thread() - - # Iterate over a copy of self.threads so thread_done doesn't - # modify the list while we're iterating - for x in self.threads[:]: - if x is current: - # don't kill the current thread. - continue - try: - x.stop() - except eventlet.greenlet.GreenletExit: - pass - except Exception: - LOG.exception(_LE('Error stopping thread.')) - - def stop_timers(self): - for x in self.timers: - try: - x.stop() - except Exception: - LOG.exception(_LE('Error stopping timer.')) - self.timers = [] - - def stop(self, graceful=False): - """stop function has the option of graceful=True/False. - - * In case of graceful=True, wait for all threads to be finished. - Never kill threads. - * In case of graceful=False, kill threads immediately. - """ - self.stop_timers() - if graceful: - # In case of graceful=True, wait for all threads to be - # finished, never kill threads - self.wait() - else: - # In case of graceful=False(Default), kill threads - # immediately - self._stop_threads() - - def wait(self): - for x in self.timers: - try: - x.wait() - except eventlet.greenlet.GreenletExit: - pass - except Exception: - LOG.exception(_LE('Error waiting on ThreadGroup.')) - current = threading.current_thread() - - # Iterate over a copy of self.threads so thread_done doesn't - # modify the list while we're iterating - for x in self.threads[:]: - if x is current: - continue - try: - x.wait() - except eventlet.greenlet.GreenletExit: - pass - except Exception as ex: - LOG.exception(ex) diff --git a/neutron/plugins/hyperv/agent/l2_agent.py b/neutron/plugins/hyperv/agent/l2_agent.py index 5818b2f00aa..5b6a8f31dec 100644 --- a/neutron/plugins/hyperv/agent/l2_agent.py +++ b/neutron/plugins/hyperv/agent/l2_agent.py @@ -20,6 +20,7 @@ from hyperv.neutron import hyperv_neutron_agent from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc @@ -28,7 +29,6 @@ from neutron.common import rpc as n_rpc from neutron.common import topics from neutron import context from neutron.i18n import _LE -from neutron.openstack.common import loopingcall LOG = logging.getLogger(__name__) CONF = cfg.CONF diff --git a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py index f00c9aca484..a9827c52e14 100644 --- a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py +++ b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py @@ -25,6 +25,7 @@ eventlet.monkey_patch() from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall import six from neutron.agent.common import ovs_lib @@ -36,7 +37,6 @@ from neutron.common import topics from neutron.common import utils as n_utils from neutron.i18n import _LE, _LI from neutron import context -from neutron.openstack.common import loopingcall from neutron.plugins.ibm.common import constants diff --git a/neutron/plugins/ml2/drivers/cisco/apic/apic_sync.py b/neutron/plugins/ml2/drivers/cisco/apic/apic_sync.py index 08873d70219..fca4e2c1188 100644 --- a/neutron/plugins/ml2/drivers/cisco/apic/apic_sync.py +++ b/neutron/plugins/ml2/drivers/cisco/apic/apic_sync.py @@ -14,12 +14,12 @@ # under the License. from oslo_log import log +from oslo_service import loopingcall from neutron.common import constants as n_constants from neutron import context from neutron.i18n import _LW from neutron import manager -from neutron.openstack.common import loopingcall from neutron.plugins.ml2 import db as l2_db from neutron.plugins.ml2 import driver_context diff --git a/neutron/plugins/ml2/drivers/cisco/apic/apic_topology.py b/neutron/plugins/ml2/drivers/cisco/apic/apic_topology.py index d4901bcf84c..8a1be65a1b0 100644 --- a/neutron/plugins/ml2/drivers/cisco/apic/apic_topology.py +++ b/neutron/plugins/ml2/drivers/cisco/apic/apic_topology.py @@ -23,6 +23,8 @@ from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import periodic_task +from oslo_service import service as svc from neutron.agent.common import config from neutron.agent.linux import ip_lib @@ -33,8 +35,6 @@ from neutron.common import utils as neutron_utils from neutron.db import agents_db from neutron.i18n import _LE, _LI from neutron import manager -from neutron.openstack.common import periodic_task -from neutron.openstack.common import service as svc from neutron.plugins.ml2.drivers.cisco.apic import mechanism_apic as ma from neutron.plugins.ml2.drivers import type_vlan # noqa @@ -325,7 +325,7 @@ def launch(binary, manager, topic=None): server = service.Service.create( binary=binary, manager=manager, topic=topic, report_interval=report_period, periodic_interval=poll_period) - svc.launch(server).wait() + svc.launch(cfg.CONF, server).wait() def service_main(): diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py index 66be308a29a..69da70c79bd 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -29,6 +29,8 @@ eventlet.monkey_patch() from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall +from oslo_service import service from six import moves from neutron.agent.linux import ip_lib @@ -42,8 +44,6 @@ from neutron.common import topics from neutron.common import utils as q_utils from neutron import context from neutron.i18n import _LE, _LI, _LW -from neutron.openstack.common import loopingcall -from neutron.openstack.common import service from neutron.plugins.common import constants as p_const from neutron.plugins.ml2.drivers.l2pop.rpc_manager \ import l2population_rpc as l2pop_rpc @@ -1055,7 +1055,7 @@ def main(): polling_interval, quitting_rpc_timeout) LOG.info(_LI("Agent initialized successfully, now running... ")) - launcher = service.launch(agent) + launcher = service.launch(cfg.CONF, agent) launcher.wait() diff --git a/neutron/plugins/ml2/drivers/mech_sriov/agent/sriov_nic_agent.py b/neutron/plugins/ml2/drivers/mech_sriov/agent/sriov_nic_agent.py index c98e94dc4a7..45124fd42c5 100644 --- a/neutron/plugins/ml2/drivers/mech_sriov/agent/sriov_nic_agent.py +++ b/neutron/plugins/ml2/drivers/mech_sriov/agent/sriov_nic_agent.py @@ -24,6 +24,7 @@ eventlet.monkey_patch() from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc @@ -33,7 +34,6 @@ from neutron.common import topics from neutron.common import utils as q_utils from neutron import context from neutron.i18n import _LE, _LI -from neutron.openstack.common import loopingcall from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config # noqa from neutron.plugins.ml2.drivers.mech_sriov.agent.common \ import exceptions as exc diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index ca52b216257..0c294638680 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -23,6 +23,7 @@ import netaddr from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall import six from six import moves @@ -40,7 +41,6 @@ from neutron.common import topics from neutron.common import utils as q_utils from neutron import context from neutron.i18n import _LE, _LI, _LW -from neutron.openstack.common import loopingcall from neutron.plugins.common import constants as p_const from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc from neutron.plugins.ml2.drivers.openvswitch.agent.common \ diff --git a/neutron/service.py b/neutron/service.py index 76b0fd90d2e..ee8432dea50 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -22,6 +22,8 @@ from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log as logging from oslo_messaging import server as rpc_server +from oslo_service import loopingcall +from oslo_service import service as common_service from oslo_utils import excutils from oslo_utils import importutils @@ -31,8 +33,6 @@ from neutron import context from neutron.db import api as session from neutron.i18n import _LE, _LI from neutron import manager -from neutron.openstack.common import loopingcall -from neutron.openstack.common import service as common_service from neutron import wsgi @@ -111,7 +111,7 @@ def serve_wsgi(cls): return service -class RpcWorker(object): +class RpcWorker(common_service.ServiceBase): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, plugin): self._plugin = plugin @@ -161,7 +161,8 @@ def serve_rpc(): # be shared DB connections in child processes which may cause # DB errors. session.dispose() - launcher = common_service.ProcessLauncher(wait_interval=1.0) + launcher = common_service.ProcessLauncher(cfg.CONF, + wait_interval=1.0) launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers) return launcher except Exception: diff --git a/neutron/services/metering/agents/metering_agent.py b/neutron/services/metering/agents/metering_agent.py index f30569e136e..7531f41e959 100644 --- a/neutron/services/metering/agents/metering_agent.py +++ b/neutron/services/metering/agents/metering_agent.py @@ -18,6 +18,9 @@ import time from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +from oslo_service import loopingcall +from oslo_service import periodic_task +from oslo_service import service from oslo_utils import importutils from neutron.agent.common import config @@ -30,9 +33,6 @@ from neutron.common import utils from neutron import context from neutron.i18n import _LE, _LI, _LW from neutron import manager -from neutron.openstack.common import loopingcall -from neutron.openstack.common import periodic_task -from neutron.openstack.common import service from neutron import service as neutron_service @@ -298,4 +298,4 @@ def main(): report_interval=cfg.CONF.AGENT.report_interval, manager='neutron.services.metering.agents.' 'metering_agent.MeteringAgentWithStateReport') - service.launch(server).wait() + service.launch(cfg.CONF, server).wait() diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index 910a834ba43..876bf8db424 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -261,7 +261,7 @@ class TestDhcpAgent(base.BaseTestCase): def test_dhcp_agent_main_agent_manager(self): logging_str = 'neutron.agent.common.config.setup_logging' - launcher_str = 'neutron.openstack.common.service.ServiceLauncher' + launcher_str = 'oslo_service.service.ServiceLauncher' with mock.patch(logging_str): with mock.patch.object(sys, 'argv') as sys_argv: with mock.patch(launcher_str) as launcher: @@ -269,7 +269,8 @@ class TestDhcpAgent(base.BaseTestCase): base.etcdir('neutron.conf')] entry.main() launcher.assert_has_calls( - [mock.call(), mock.call().launch_service(mock.ANY), + [mock.call(cfg.CONF), + mock.call().launch_service(mock.ANY), mock.call().wait()]) def test_run_completes_single_pass(self): diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index 0b28fd5993c..234e91cbe64 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -132,7 +132,7 @@ class BasicRouterOperationsFramework(base.BaseTestCase): l3pluginApi_cls.return_value = self.plugin_api self.looping_call_p = mock.patch( - 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall') + 'oslo_service.loopingcall.FixedIntervalLoopingCall') self.looping_call_p.start() subnet_id_1 = _uuid() diff --git a/neutron/tests/unit/agent/l3/test_dvr_local_router.py b/neutron/tests/unit/agent/l3/test_dvr_local_router.py index 2b5b5a6d366..51e89802f95 100644 --- a/neutron/tests/unit/agent/l3/test_dvr_local_router.py +++ b/neutron/tests/unit/agent/l3/test_dvr_local_router.py @@ -116,7 +116,7 @@ class TestDvrRouterOperations(base.BaseTestCase): l3pluginApi_cls.return_value = self.plugin_api self.looping_call_p = mock.patch( - 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall') + 'oslo_service.loopingcall.FixedIntervalLoopingCall') self.looping_call_p.start() subnet_id_1 = _uuid() diff --git a/neutron/tests/unit/agent/metadata/test_agent.py b/neutron/tests/unit/agent/metadata/test_agent.py index eaa5a773fe8..9bef96864c8 100644 --- a/neutron/tests/unit/agent/metadata/test_agent.py +++ b/neutron/tests/unit/agent/metadata/test_agent.py @@ -517,7 +517,7 @@ class TestUnixDomainMetadataProxy(base.BaseTestCase): self.cfg_p = mock.patch.object(agent, 'cfg') self.cfg = self.cfg_p.start() looping_call_p = mock.patch( - 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall') + 'oslo_service.loopingcall.FixedIntervalLoopingCall') self.looping_mock = looping_call_p.start() self.cfg.CONF.metadata_proxy_socket = '/the/path' self.cfg.CONF.metadata_workers = 0 diff --git a/neutron/tests/unit/plugins/ibm/test_sdnve_agent.py b/neutron/tests/unit/plugins/ibm/test_sdnve_agent.py index a170704c826..08d689e127d 100644 --- a/neutron/tests/unit/plugins/ibm/test_sdnve_agent.py +++ b/neutron/tests/unit/plugins/ibm/test_sdnve_agent.py @@ -68,7 +68,7 @@ class TestSdnveNeutronAgent(base.BaseTestCase): with mock.patch('neutron.plugins.ibm.agent.sdnve_neutron_agent.' 'SdnveNeutronAgent.setup_integration_br', return_value=mock.Mock()),\ - mock.patch('neutron.openstack.common.loopingcall.' + mock.patch('oslo_service.loopingcall.' 'FixedIntervalLoopingCall', new=MockFixedIntervalLoopingCall): self.agent = sdnve_neutron_agent.SdnveNeutronAgent(**kwargs) diff --git a/neutron/tests/unit/plugins/ml2/drivers/cisco/apic/test_apic_sync.py b/neutron/tests/unit/plugins/ml2/drivers/cisco/apic/test_apic_sync.py index b687f478883..47584710105 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/cisco/apic/test_apic_sync.py +++ b/neutron/tests/unit/plugins/ml2/drivers/cisco/apic/test_apic_sync.py @@ -22,7 +22,7 @@ sys.modules["apicapi"] = mock.Mock() from neutron.plugins.ml2.drivers.cisco.apic import apic_sync from neutron.tests import base -LOOPING_CALL = 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall' +LOOPING_CALL = 'oslo_service.loopingcall.FixedIntervalLoopingCall' GET_PLUGIN = 'neutron.manager.NeutronManager.get_plugin' GET_ADMIN_CONTEXT = 'neutron.context.get_admin_context' L2_DB = 'neutron.plugins.ml2.db.get_locked_port_and_binding' diff --git a/neutron/tests/unit/plugins/ml2/drivers/cisco/apic/test_apic_topology.py b/neutron/tests/unit/plugins/ml2/drivers/cisco/apic/test_apic_topology.py index 016f16453f2..292cb54e0ff 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/cisco/apic/test_apic_topology.py +++ b/neutron/tests/unit/plugins/ml2/drivers/cisco/apic/test_apic_topology.py @@ -28,7 +28,7 @@ NOTIFIER = ('neutron.plugins.ml2.drivers.cisco.apic.' 'apic_topology.ApicTopologyServiceNotifierApi') RPC_CONNECTION = 'neutron.common.rpc.Connection' AGENTS_DB = 'neutron.db.agents_db' -PERIODIC_TASK = 'neutron.openstack.common.periodic_task' +PERIODIC_TASK = 'oslo_service.periodic_task' DEV_EXISTS = 'neutron.agent.linux.ip_lib.device_exists' IP_DEVICE = 'neutron.agent.linux.ip_lib.IPDevice' EXECUTE = 'neutron.agent.linux.utils.execute' diff --git a/neutron/tests/unit/plugins/ml2/drivers/mech_sriov/agent/test_sriov_nic_agent.py b/neutron/tests/unit/plugins/ml2/drivers/mech_sriov/agent/test_sriov_nic_agent.py index 5e16c15b3a9..9729d3fb7be 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/mech_sriov/agent/test_sriov_nic_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/mech_sriov/agent/test_sriov_nic_agent.py @@ -43,7 +43,7 @@ class TestSriovAgent(base.BaseTestCase): def start(self, interval=0): self.f() - mock.patch('neutron.openstack.common.loopingcall.' + mock.patch('oslo_service.loopingcall.' 'FixedIntervalLoopingCall', new=MockFixedIntervalLoopingCall) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index b3ab4fa3efb..ebf02ec6c5a 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -124,8 +124,7 @@ class TestOvsNeutronAgent(object): return_value='00:00:00:00:00:01'),\ mock.patch( 'neutron.agent.common.ovs_lib.BaseOVS.get_bridges'),\ - mock.patch('neutron.openstack.common.loopingcall.' - 'FixedIntervalLoopingCall', + mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall', new=MockFixedIntervalLoopingCall),\ mock.patch( 'neutron.agent.common.ovs_lib.OVSBridge.' 'get_vif_ports', @@ -1292,7 +1291,7 @@ class TestOvsDvrNeutronAgent(object): return_value='00:00:00:00:00:01'),\ mock.patch( 'neutron.agent.common.ovs_lib.BaseOVS.get_bridges'),\ - mock.patch('neutron.openstack.common.loopingcall.' + mock.patch('oslo_service.loopingcall.' 'FixedIntervalLoopingCall', new=MockFixedIntervalLoopingCall),\ mock.patch( diff --git a/neutron/tests/unit/plugins/oneconvergence/test_nvsd_agent.py b/neutron/tests/unit/plugins/oneconvergence/test_nvsd_agent.py index 769bf4f421f..e840537bd0a 100644 --- a/neutron/tests/unit/plugins/oneconvergence/test_nvsd_agent.py +++ b/neutron/tests/unit/plugins/oneconvergence/test_nvsd_agent.py @@ -36,7 +36,7 @@ class TestOneConvergenceAgentBase(base.BaseTestCase): cfg.CONF.set_default('firewall_driver', 'neutron.agent.firewall.NoopFirewallDriver', group='SECURITYGROUP') - with mock.patch('neutron.openstack.common.loopingcall.' + with mock.patch('oslo_service.loopingcall.' 'FixedIntervalLoopingCall') as loopingcall: kwargs = {'integ_br': 'integration_bridge', 'polling_interval': 5} diff --git a/neutron/tests/unit/services/metering/agents/test_metering_agent.py b/neutron/tests/unit/services/metering/agents/test_metering_agent.py index 9b50dde1e27..11601873c51 100644 --- a/neutron/tests/unit/services/metering/agents/test_metering_agent.py +++ b/neutron/tests/unit/services/metering/agents/test_metering_agent.py @@ -67,7 +67,7 @@ class TestMeteringOperations(base.BaseTestCase): self.driver_patch.start() loopingcall_patch = mock.patch( - 'neutron.openstack.common.loopingcall.FixedIntervalLoopingCall') + 'oslo_service.loopingcall.FixedIntervalLoopingCall') loopingcall_patch.start() self.agent = metering_agent.MeteringAgent('my agent', cfg.CONF) @@ -172,7 +172,7 @@ class TestMeteringDriver(base.BaseTestCase): 'add_metering_label'}) def test_init_chain(self): - with mock.patch('neutron.openstack.common.' + with mock.patch('oslo_service.' 'periodic_task.PeriodicTasks.__init__') as init: metering_agent.MeteringAgent('my agent', cfg.CONF) - init.assert_called_once_with() + init.assert_called_once_with(cfg.CONF) diff --git a/neutron/tests/unit/test_wsgi.py b/neutron/tests/unit/test_wsgi.py index 0f94a14ca23..b64e03937aa 100644 --- a/neutron/tests/unit/test_wsgi.py +++ b/neutron/tests/unit/test_wsgi.py @@ -95,7 +95,7 @@ class TestWSGIServer(base.BaseTestCase): server.stop() server.wait() - @mock.patch('neutron.openstack.common.service.ProcessLauncher') + @mock.patch('oslo_service.service.ProcessLauncher') def test_start_multiple_workers(self, ProcessLauncher): launcher = ProcessLauncher.return_value diff --git a/neutron/wsgi.py b/neutron/wsgi.py index 2bde31369b4..a207c35d24f 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -31,6 +31,8 @@ import oslo_i18n from oslo_log import log as logging from oslo_log import loggers from oslo_serialization import jsonutils +from oslo_service import service as common_service +from oslo_service import systemd from oslo_utils import excutils import routes.middleware import six @@ -42,8 +44,6 @@ from neutron.common import exceptions as exception from neutron import context from neutron.db import api from neutron.i18n import _LE, _LI -from neutron.openstack.common import service as common_service -from neutron.openstack.common import systemd socket_opts = [ cfg.IntOpt('backlog', @@ -92,7 +92,7 @@ CONF.register_opts(socket_opts) LOG = logging.getLogger(__name__) -class WorkerService(object): +class WorkerService(common_service.ServiceBase): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, service, application): self._service = service @@ -248,7 +248,8 @@ class Server(object): # The API service runs in a number of child processes. # Minimize the cost of checking for child exit by extending the # wait interval past the default of 0.01s. - self._server = common_service.ProcessLauncher(wait_interval=1.0) + self._server = common_service.ProcessLauncher(cfg.CONF, + wait_interval=1.0) self._server.launch_service(service, workers=workers) @property diff --git a/openstack-common.conf b/openstack-common.conf index 12270514017..fbc952c8fae 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,17 +1,11 @@ [DEFAULT] # The list of modules to copy from oslo-incubator.git module=cache -module=eventlet_backdoor module=fileutils # The following module is not synchronized by update.sh script since it's # located in tools/ not neutron/openstack/common/. Left here to make it # explicit that we still ship code from incubator here #module=install_venv_common -module=loopingcall -module=periodic_task -module=service -module=systemd -module=threadgroup # The base module to hold the copy of openstack.common base=neutron diff --git a/requirements.txt b/requirements.txt index 9d4e1c260bd..0d9d0127996 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,6 +33,7 @@ oslo.middleware!=2.0.0,>=1.2.0 # Apache-2.0 oslo.policy>=0.5.0 # Apache-2.0 oslo.rootwrap>=2.0.0 # Apache-2.0 oslo.serialization>=1.4.0 # Apache-2.0 +oslo.service>=0.1.0 # Apache-2.0 oslo.utils>=1.6.0 # Apache-2.0 python-novaclient>=2.22.0