78b1263237
This patch adds support for long-running provider driver agents to the Octavia driver-agent. It will fork a process for all of the enabled provider driver agents at startup. Change-Id: Ib7042bcc48b1dd5b37b671dd5e64728b71ab9542 Story: 2006250 Task: 35863
169 lines
6.0 KiB
Python
169 lines
6.0 KiB
Python
# Copyright 2018 Rackspace, US Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from functools import partial
|
|
import multiprocessing
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
from oslo_reports import guru_meditation_report as gmr
|
|
import setproctitle
|
|
from stevedore import enabled as stevedore_enabled
|
|
|
|
from octavia.api.drivers.driver_agent import driver_listener
|
|
from octavia.common import service
|
|
from octavia import version
|
|
|
|
CONF = cfg.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
PROVIDER_AGENT_PROCESSES = []
|
|
|
|
|
|
def _mutate_config(*args, **kwargs):
|
|
CONF.mutate_config_files()
|
|
|
|
|
|
def _handle_mutate_config(status_proc_pid, stats_proc_pid, *args, **kwargs):
|
|
LOG.info("Driver agent received HUP signal, mutating config.")
|
|
_mutate_config()
|
|
os.kill(status_proc_pid, signal.SIGHUP)
|
|
os.kill(stats_proc_pid, signal.SIGHUP)
|
|
|
|
|
|
def _check_if_provider_agent_enabled(extension):
|
|
if extension.name in CONF.driver_agent.enabled_provider_agents:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _process_wrapper(exit_event, proc_name, function, agent_name=None):
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
signal.signal(signal.SIGHUP, _mutate_config)
|
|
if agent_name:
|
|
process_title = 'octavia-driver-agent - {} -- {}'.format(
|
|
proc_name, agent_name)
|
|
else:
|
|
process_title = 'octavia-driver-agent - {}'.format(proc_name)
|
|
setproctitle.setproctitle(process_title)
|
|
while not exit_event.is_set():
|
|
try:
|
|
function(exit_event)
|
|
except Exception as e:
|
|
if agent_name:
|
|
LOG.exception('Provider agent "%s" raised exception: %s. '
|
|
'Restarting the "%s" provider agent.',
|
|
agent_name, str(e), agent_name)
|
|
else:
|
|
LOG.exception('%s raised exception: %s. '
|
|
'Restarting %s.',
|
|
proc_name, str(e), proc_name)
|
|
time.sleep(1)
|
|
continue
|
|
break
|
|
|
|
|
|
def _start_provider_agents(exit_event):
|
|
extensions = stevedore_enabled.EnabledExtensionManager(
|
|
namespace='octavia.driver_agent.provider_agents',
|
|
check_func=_check_if_provider_agent_enabled)
|
|
for ext in extensions:
|
|
ext_process = multiprocessing.Process(
|
|
name=ext.name, target=_process_wrapper,
|
|
args=(exit_event, 'provider_agent', ext.plugin),
|
|
kwargs={'agent_name': ext.name})
|
|
PROVIDER_AGENT_PROCESSES.append(ext_process)
|
|
ext_process.start()
|
|
LOG.info('Started enabled provider agent: "%s" with PID: %d.',
|
|
ext.name, ext_process.pid)
|
|
|
|
|
|
def main():
|
|
service.prepare_service(sys.argv)
|
|
|
|
gmr.TextGuruMeditation.setup_autorun(version)
|
|
|
|
processes = []
|
|
exit_event = multiprocessing.Event()
|
|
|
|
status_listener_proc = multiprocessing.Process(
|
|
name='status_listener', target=_process_wrapper,
|
|
args=(exit_event, 'status_listener', driver_listener.status_listener))
|
|
processes.append(status_listener_proc)
|
|
|
|
LOG.info("Driver agent status listener process starts:")
|
|
status_listener_proc.start()
|
|
|
|
stats_listener_proc = multiprocessing.Process(
|
|
name='stats_listener', target=_process_wrapper,
|
|
args=(exit_event, 'stats_listener', driver_listener.stats_listener))
|
|
processes.append(stats_listener_proc)
|
|
|
|
LOG.info("Driver agent statistics listener process starts:")
|
|
stats_listener_proc.start()
|
|
|
|
get_listener_proc = multiprocessing.Process(
|
|
name='get_listener', target=_process_wrapper,
|
|
args=(exit_event, 'get_listener', driver_listener.get_listener))
|
|
processes.append(get_listener_proc)
|
|
|
|
LOG.info("Driver agent get listener process starts:")
|
|
get_listener_proc.start()
|
|
|
|
_start_provider_agents(exit_event)
|
|
|
|
def process_cleanup(*args, **kwargs):
|
|
LOG.info("Driver agent exiting due to signal.")
|
|
exit_event.set()
|
|
status_listener_proc.join()
|
|
stats_listener_proc.join()
|
|
get_listener_proc.join()
|
|
|
|
for proc in PROVIDER_AGENT_PROCESSES:
|
|
LOG.info('Waiting up to %s seconds for provider agent "%s" to '
|
|
'shutdown.',
|
|
CONF.driver_agent.provider_agent_shutdown_timeout,
|
|
proc.name)
|
|
try:
|
|
proc.join(CONF.driver_agent.provider_agent_shutdown_timeout)
|
|
if proc.exitcode is None:
|
|
# TODO(johnsom) Change to proc.kill() once
|
|
# python 3.7 or newer only
|
|
os.kill(proc.pid, signal.SIGKILL)
|
|
LOG.warning(
|
|
'Forcefully killed "%s" provider agent because it '
|
|
'failed to shutdown in %s seconds.', proc.name,
|
|
CONF.driver_agent.provider_agent_shutdown_timeout)
|
|
except Exception as e:
|
|
LOG.warning('Unknown error "%s" while shutting down "%s", '
|
|
'ignoring and continuing shutdown process.',
|
|
str(e), proc.name)
|
|
else:
|
|
LOG.info('Provider agent "%s" has succesfully shutdown.',
|
|
proc.name)
|
|
|
|
signal.signal(signal.SIGTERM, process_cleanup)
|
|
signal.signal(signal.SIGHUP, partial(
|
|
_handle_mutate_config, status_listener_proc.pid,
|
|
stats_listener_proc.pid, get_listener_proc.pid))
|
|
|
|
try:
|
|
for process in processes:
|
|
process.join()
|
|
except KeyboardInterrupt:
|
|
process_cleanup()
|